release: 1.6.2

resolve #12
resolve #27
fix #39
fix #41
fix #47
This commit is contained in:
acgnhik 2022-04-24 14:59:04 +08:00
parent b4123e7982
commit c18c7bd400
18 changed files with 426 additions and 143 deletions

View File

@ -78,6 +78,10 @@ jobs:
working-directory: build working-directory: build
run: Copy-Item "${{ github.workspace }}\run.bat" -Destination ".\run.bat" run: Copy-Item "${{ github.workspace }}\run.bat" -Destination ".\run.bat"
- name: Copy run.ps1
working-directory: build
run: Copy-Item "${{ github.workspace }}\run.ps1" -Destination ".\run.ps1"
- name: Zip files - name: Zip files
run: | run: |
ls build ls build

View File

@ -1,5 +1,12 @@
# 更新日志 # 更新日志
## 1.6.2
- 忽略 Windows 注册表 JavaScript MIME 设置 (issue #12, 27)
- 修复 HLS 录制出错 (issue #39, 41)
- 修 bug (issue #47)
- Windows 绿色版默认主机绑定 0.0.0.0 并加上 api key
## 1.6.1 ## 1.6.1
- 修复 bug (issue #37, 38, 40) - 修复 bug (issue #37, 38, 40)

View File

@ -129,7 +129,7 @@ sudo docker run \
例如:`blrec --host 0.0.0.0 --port 8000` 例如:`blrec --host 0.0.0.0 --port 8000`
### 安全保障 ### 网络安全
指定 `SSL` 证书使用 **https** 协议并指定 `api key` 可防止被恶意访问和泄漏设置里的敏感信息 指定 `SSL` 证书使用 **https** 协议并指定 `api key` 可防止被恶意访问和泄漏设置里的敏感信息
@ -141,6 +141,16 @@ sudo docker run \
如果在不信任的环境下,请使用浏览器的隐式模式访问。 如果在不信任的环境下,请使用浏览器的隐式模式访问。
### 关于 api-key
api key 可以使用数字和字母,长度限制为最短 8 最长 80。
3 次尝试内 api key 正确客户端 ip 会自动加入白名单3 次错误后则 ip 会被加入黑名单,黑名单后请求会被拒绝 (403)。
黑名单和白名单数以及同时尝试连接的 ip 数量限制各为 100黑名单或白名单到达限制后不再接受除了白名单内的其它 ip 。
只有重启才会清空黑名单和白名单。
## 作为 ASGI 应用运行 ## 作为 ASGI 应用运行
uvicorn blrec.web:app uvicorn blrec.web:app

27
run.bat
View File

@ -3,20 +3,33 @@ chcp 65001
set PATH=.\ffmpeg\bin;.\python;%PATH% set PATH=.\ffmpeg\bin;.\python;%PATH%
REM 不使用代理 @REM 不使用代理
set no_proxy=* set no_proxy=*
REM 默认本地主机和端口绑定 @REM 主机和端口绑定,可以按需修改。
set host=localhost set host=0.0.0.0
set port=2233 set port=2233
REM 服务器主机和端口绑定,去掉注释并按照自己的情况修改。 @REM 关于 api key
REM set host=0.0.0.0
REM set port=80 @REM api key 可以使用数字和字母,长度限制为最短 8 最长 80。
@REM 3 次尝试内 api key 正确客户端 ip 会自动加入白名单3 次错误后则 ip 会被加入黑名单,黑名单后请求会被拒绝 (403)。
@REM 黑名单和白名单数以及同时尝试连接的 ip 数量限制各为 100黑名单或白名单到达限制后不再接受除了白名单内的其它 ip 。
@REM 只有重启才会清空黑名单和白名单。
@REM 浏览器第一次访问会弹对话框要求输入 api key。
@REM 输入的 api key 会被保存在浏览器的 local storage下次使用同一浏览器不用再次输入。
@REM 请自行修改 api key不要使用默认的 api key。
set api_key=bili2233
set DEFAULT_LOG_DIR=日志文件 set DEFAULT_LOG_DIR=日志文件
set DEFAULT_OUT_DIR=录播文件 set DEFAULT_OUT_DIR=录播文件
python -m blrec -c settings.toml --open --host %host% --port %port% python -m blrec -c settings.toml --open --host %host% --port %port% --api-key %api_key%
pause pause

27
run.ps1 Normal file
View File

@ -0,0 +1,27 @@
chcp 65001
$env:PATH = ".\ffmpeg\bin;.\python;" + $env:PATH
# 不使用代理
$env:no_proxy = "*"
# 主机和端口绑定,可以按需修改。
$env:host = "0.0.0.0"
$env:port = 2233
# 关于 api key
# api key 可以使用数字和字母,长度限制为最短 8 最长 80。
# 3 次尝试内 api key 正确客户端 ip 会自动加入白名单3 次错误后则 ip 会被加入黑名单,黑名单后请求会被拒绝 (403)。
# 黑名单和白名单数以及同时尝试连接的 ip 数量限制各为 100黑名单或白名单到达限制后不再接受除了白名单内的其它 ip 。
# 只有重启才会清空黑名单和白名单。
# 浏览器第一次访问会弹对话框要求输入 api key。
# 输入的 api key 会被保存在浏览器的 local storage下次使用同一浏览器不用再次输入。
# 请自行修改 api key不要使用默认的 api key。
$env:api_key = "bili2233"
$env:DEFAULT_LOG_DIR = "日志文件"
$env:DEFAULT_OUT_DIR = "录播文件"
python -m blrec -c settings.toml --open --host $env:host --port $env:port --api-key $env:api_key
pause

View File

@ -1,4 +1,4 @@
__prog__ = 'blrec' __prog__ = 'blrec'
__version__ = '1.6.1' __version__ = '1.6.2'
__github__ = 'https://github.com/acgnhiki/blrec' __github__ = 'https://github.com/acgnhiki/blrec'

View File

@ -15,7 +15,9 @@ from tenacity import (
from .api import AppApi, WebApi from .api import AppApi, WebApi
from .models import LiveStatus, RoomInfo, UserInfo from .models import LiveStatus, RoomInfo, UserInfo
from .typing import StreamFormat, QualityNumber, StreamCodec, ResponseData from .typing import (
ApiPlatform, StreamFormat, QualityNumber, StreamCodec, ResponseData
)
from .exceptions import ( from .exceptions import (
LiveRoomHidden, LiveRoomLocked, LiveRoomEncrypted, NoStreamAvailable, LiveRoomHidden, LiveRoomLocked, LiveRoomEncrypted, NoStreamAvailable,
NoStreamFormatAvailable, NoStreamCodecAvailable, NoStreamQualityAvailable, NoStreamFormatAvailable, NoStreamCodecAvailable, NoStreamQualityAvailable,
@ -177,12 +179,14 @@ class Live:
async def get_live_stream_urls( async def get_live_stream_urls(
self, self,
qn: QualityNumber = 10000, qn: QualityNumber = 10000,
*,
api_platform: ApiPlatform = 'android',
stream_format: StreamFormat = 'flv', stream_format: StreamFormat = 'flv',
stream_codec: StreamCodec = 'avc', stream_codec: StreamCodec = 'avc',
) -> List[str]: ) -> List[str]:
try: if api_platform == 'android':
info = await self._appapi.get_room_play_info(self._room_id, qn) info = await self._appapi.get_room_play_info(self._room_id, qn)
except Exception: else:
info = await self._webapi.get_room_play_info(self._room_id, qn) info = await self._webapi.get_room_play_info(self._room_id, qn)
self._check_room_play_info(info) self._check_room_play_info(info)

View File

@ -1,7 +1,10 @@
from typing import Any, Dict, Literal, Mapping from typing import Any, Dict, Literal, Mapping
Danmaku = Mapping[str, Any] ApiPlatform = Literal[
'web',
'android',
]
QualityNumber = Literal[ QualityNumber = Literal[
20000, # 4K 20000, # 4K
@ -26,3 +29,5 @@ StreamCodec = Literal[
JsonResponse = Dict[str, Any] JsonResponse = Dict[str, Any]
ResponseData = Dict[str, Any] ResponseData = Dict[str, Any]
Danmaku = Mapping[str, Any]

View File

@ -34,10 +34,11 @@ from .stream_analyzer import StreamProfile
from .statistics import StatisticsCalculator from .statistics import StatisticsCalculator
from ..event.event_emitter import EventListener, EventEmitter from ..event.event_emitter import EventListener, EventEmitter
from ..bili.live import Live from ..bili.live import Live
from ..bili.typing import StreamFormat, QualityNumber from ..bili.typing import ApiPlatform, StreamFormat, QualityNumber
from ..bili.helpers import get_quality_name from ..bili.helpers import get_quality_name
from ..flv.data_analyser import MetaData from ..flv.data_analyser import MetaData
from ..flv.stream_processor import StreamProcessor, BaseOutputFileManager from ..flv.stream_processor import StreamProcessor, BaseOutputFileManager
from ..utils.io import wait_for
from ..utils.mixins import AsyncCooperationMixin, AsyncStoppableMixin from ..utils.mixins import AsyncCooperationMixin, AsyncStoppableMixin
from ..path import escape_path from ..path import escape_path
from ..logging.room_id import aio_task_with_room_id from ..logging.room_id import aio_task_with_room_id
@ -102,7 +103,8 @@ class BaseStreamRecorder(
self._quality_number = quality_number self._quality_number = quality_number
self._real_stream_format: Optional[StreamFormat] = None self._real_stream_format: Optional[StreamFormat] = None
self._real_quality_number: Optional[QualityNumber] = None self._real_quality_number: Optional[QualityNumber] = None
self._use_candidate_stream: bool = False self._api_platform: ApiPlatform = 'android'
self._use_alternative_stream: bool = False
self.buffer_size = buffer_size or io.DEFAULT_BUFFER_SIZE # bytes self.buffer_size = buffer_size or io.DEFAULT_BUFFER_SIZE # bytes
self.read_timeout = read_timeout or 3 # seconds self.read_timeout = read_timeout or 3 # seconds
self.disconnection_timeout = disconnection_timeout or 600 # seconds self.disconnection_timeout = disconnection_timeout or 600 # seconds
@ -268,7 +270,8 @@ class BaseStreamRecorder(
self._stream_url = '' self._stream_url = ''
self._stream_host = '' self._stream_host = ''
self._stream_profile = {} self._stream_profile = {}
self._use_candidate_stream = False self._api_platform = 'android'
self._use_alternative_stream = False
self._connection_recovered.clear() self._connection_recovered.clear()
self._thread = Thread( self._thread = Thread(
target=self._run, name=f'StreamRecorder::{self._live.room_id}' target=self._run, name=f'StreamRecorder::{self._live.room_id}'
@ -287,6 +290,12 @@ class BaseStreamRecorder(
def _run(self) -> None: def _run(self) -> None:
raise NotImplementedError() raise NotImplementedError()
def _rotate_api_platform(self) -> None:
if self._api_platform == 'android':
self._api_platform = 'web'
else:
self._api_platform = 'android'
@retry( @retry(
reraise=True, reraise=True,
retry=retry_if_exception_type(( retry=retry_if_exception_type((
@ -300,11 +309,16 @@ class BaseStreamRecorder(
fmt = self._real_stream_format or self.stream_format fmt = self._real_stream_format or self.stream_format
logger.info( logger.info(
f'Getting the live stream url... qn: {qn}, format: {fmt}, ' f'Getting the live stream url... qn: {qn}, format: {fmt}, '
f'use_candidate_stream: {self._use_candidate_stream}' f'api platform: {self._api_platform}, '
f'use alternative stream: {self._use_alternative_stream}'
) )
try: try:
urls = self._run_coroutine( urls = self._run_coroutine(
self._live.get_live_stream_urls(qn, fmt) self._live.get_live_stream_urls(
qn,
api_platform=self._api_platform,
stream_format=fmt,
)
) )
except NoStreamQualityAvailable: except NoStreamQualityAvailable:
logger.info( logger.info(
@ -332,6 +346,10 @@ class BaseStreamRecorder(
except NoStreamCodecAvailable as e: except NoStreamCodecAvailable as e:
logger.warning(repr(e)) logger.warning(repr(e))
raise TryAgain raise TryAgain
except Exception as e:
logger.warning(f'Failed to get live stream urls: {repr(e)}')
self._rotate_api_platform()
raise TryAgain
else: else:
logger.info( logger.info(
f'Adopted the stream format ({fmt}) and quality ({qn})' f'Adopted the stream format ({fmt}) and quality ({qn})'
@ -339,17 +357,19 @@ class BaseStreamRecorder(
self._real_quality_number = qn self._real_quality_number = qn
self._real_stream_format = fmt self._real_stream_format = fmt
if not self._use_candidate_stream: if not self._use_alternative_stream:
url = urls[0] url = urls[0]
else: else:
try: try:
url = urls[1] url = urls[1]
except IndexError: except IndexError:
self._use_alternative_stream = False
self._rotate_api_platform()
logger.info( logger.info(
'No candidate stream url available, ' 'No alternative stream url available, will using the primary'
'will using the primary stream url instead.' f' stream url from {self._api_platform} api instead.'
) )
url = urls[0] raise TryAgain
logger.info(f"Got live stream url: '{url}'") logger.info(f"Got live stream url: '{url}'")
return url return url
@ -441,8 +461,14 @@ B站直播录像
class StreamProxy(io.RawIOBase): class StreamProxy(io.RawIOBase):
def __init__(self, stream: io.BufferedIOBase) -> None: def __init__(
self,
stream: io.BufferedIOBase,
*,
read_timeout: Optional[float] = None,
) -> None:
self._stream = stream self._stream = stream
self._read_timmeout = read_timeout
self._offset = 0 self._offset = 0
self._size_updates = Subject() self._size_updates = Subject()
@ -465,7 +491,14 @@ class StreamProxy(io.RawIOBase):
return True return True
def read(self, size: int = -1) -> bytes: def read(self, size: int = -1) -> bytes:
data = self._stream.read(size) if self._stream.closed:
raise EOFError
if self._read_timmeout:
data = wait_for(
self._stream.read, args=(size, ), timeout=self._read_timmeout
)
else:
data = self._stream.read(size)
self._offset += len(data) self._offset += len(data)
self._size_updates.on_next(len(data)) self._size_updates.on_next(len(data))
return data return data
@ -474,7 +507,14 @@ class StreamProxy(io.RawIOBase):
return self._offset return self._offset
def readinto(self, b: Any) -> int: def readinto(self, b: Any) -> int:
n = self._stream.readinto(b) if self._stream.closed:
raise EOFError
if self._read_timmeout:
n = wait_for(
self._stream.readinto, args=(b, ), timeout=self._read_timmeout
)
else:
n = self._stream.readinto(b)
self._offset += n self._offset += n
self._size_updates.on_next(n) self._size_updates.on_next(n)
return n return n

View File

@ -0,0 +1,3 @@
class FailedToFetchSegments(Exception):
pass

View File

@ -106,6 +106,7 @@ class FLVStreamRecorder(
except Exception as e: except Exception as e:
self._handle_exception(e) self._handle_exception(e)
finally: finally:
self._stopped = True
if self._stream_processor is not None: if self._stream_processor is not None:
self._stream_processor.finalize() self._stream_processor.finalize()
self._stream_processor = None self._stream_processor = None
@ -151,7 +152,6 @@ class FLVStreamRecorder(
except Exception as e: except Exception as e:
logger.exception(e) logger.exception(e)
self._handle_exception(e) self._handle_exception(e)
self._stopped = True
def _streaming_loop(self) -> None: def _streaming_loop(self) -> None:
url = self._get_live_stream_url() url = self._get_live_stream_url()
@ -177,13 +177,12 @@ class FLVStreamRecorder(
except requests.exceptions.ConnectionError as e: except requests.exceptions.ConnectionError as e:
logger.warning(repr(e)) logger.warning(repr(e))
self._wait_for_connection_error() self._wait_for_connection_error()
except FlvDataError as e: except (FlvDataError, FlvStreamCorruptedError) as e:
logger.warning(repr(e)) logger.warning(repr(e))
self._use_candidate_stream = not self._use_candidate_stream if not self._use_alternative_stream:
url = self._get_live_stream_url() self._use_alternative_stream = True
except FlvStreamCorruptedError as e: else:
logger.warning(repr(e)) self._rotate_api_platform()
self._use_candidate_stream = not self._use_candidate_stream
url = self._get_live_stream_url() url = self._get_live_stream_url()
def _streaming(self, url: str) -> None: def _streaming(self, url: str) -> None:

View File

@ -3,12 +3,12 @@ import time
import errno import errno
import logging import logging
from queue import Queue, Empty from queue import Queue, Empty
from threading import Thread, Event, Lock from threading import Thread, Event, Lock, Condition
from datetime import datetime from datetime import datetime
from contextlib import suppress from contextlib import suppress
from urllib.parse import urlparse from urllib.parse import urlparse
from typing import Set, Optional from typing import List, Set, Optional
import urllib3 import urllib3
import requests import requests
@ -30,10 +30,12 @@ from tenacity import (
from .stream_remuxer import StreamRemuxer from .stream_remuxer import StreamRemuxer
from .stream_analyzer import ffprobe, StreamProfile from .stream_analyzer import ffprobe, StreamProfile
from .base_stream_recorder import BaseStreamRecorder, StreamProxy from .base_stream_recorder import BaseStreamRecorder, StreamProxy
from .exceptions import FailedToFetchSegments
from .retry import wait_exponential_for_same_exceptions, before_sleep_log from .retry import wait_exponential_for_same_exceptions, before_sleep_log
from ..bili.live import Live from ..bili.live import Live
from ..bili.typing import StreamFormat, QualityNumber from ..bili.typing import StreamFormat, QualityNumber
from ..flv.stream_processor import StreamProcessor from ..flv.stream_processor import StreamProcessor
from ..flv.exceptions import FlvDataError, FlvStreamCorruptedError
from ..utils.mixins import ( from ..utils.mixins import (
AsyncCooperationMixin, AsyncStoppableMixin, SupportDebugMixin AsyncCooperationMixin, AsyncStoppableMixin, SupportDebugMixin
) )
@ -81,6 +83,9 @@ class HLSStreamRecorder(
duration_limit=duration_limit, duration_limit=duration_limit,
) )
self._init_for_debug(self._live.room_id) self._init_for_debug(self._live.room_id)
self._init_section_data: Optional[bytes] = None
self._ready_to_fetch_segments = Condition()
self._failed_to_fetch_segments = Event()
self._stream_analysed_lock = Lock() self._stream_analysed_lock = Lock()
self._last_segment_uris: Set[str] = set() self._last_segment_uris: Set[str] = set()
@ -95,52 +100,54 @@ class HLSStreamRecorder(
) )
self._playlist_debug_file = open(path, 'wt', encoding='utf-8') self._playlist_debug_file = open(path, 'wt', encoding='utf-8')
with StreamRemuxer(self._live.room_id) as self._stream_remuxer: self._session = requests.Session()
with requests.Session() as self._session: self._session.headers.update(self._live.headers)
self._session.headers.update(self._live.headers)
self._segment_queue: Queue[Segment] = Queue(maxsize=1000) self._stream_remuxer = StreamRemuxer(self._live.room_id)
self._segment_data_queue: Queue[bytes] = Queue(maxsize=100) self._segment_queue: Queue[Segment] = Queue(maxsize=1000)
self._stream_host_available = Event() self._segment_data_queue: Queue[bytes] = Queue(maxsize=100)
self._stream_host_available = Event()
self._segment_fetcher_thread = Thread( self._segment_fetcher_thread = Thread(
target=self._run_segment_fetcher, target=self._run_segment_fetcher,
name=f'SegmentFetcher::{self._live.room_id}', name=f'SegmentFetcher::{self._live.room_id}',
daemon=True, daemon=True,
) )
self._segment_fetcher_thread.start() self._segment_fetcher_thread.start()
self._segment_data_feeder_thread = Thread( self._segment_data_feeder_thread = Thread(
target=self._run_segment_data_feeder, target=self._run_segment_data_feeder,
name=f'SegmentDataFeeder::{self._live.room_id}', name=f'SegmentDataFeeder::{self._live.room_id}',
daemon=True, daemon=True,
) )
self._segment_data_feeder_thread.start() self._segment_data_feeder_thread.start()
self._stream_processor_thread = Thread( self._stream_processor_thread = Thread(
target=self._run_stream_processor, target=self._run_stream_processor,
name=f'StreamProcessor::{self._live.room_id}', name=f'StreamProcessor::{self._live.room_id}',
daemon=True, daemon=True,
) )
self._stream_processor_thread.start() self._stream_processor_thread.start()
try: try:
self._main_loop() self._main_loop()
finally: finally:
if self._stream_processor is not None: if self._stream_processor is not None:
self._stream_processor.cancel() self._stream_processor.cancel()
self._segment_fetcher_thread.join(timeout=10) self._stream_processor_thread.join(timeout=10)
self._segment_data_feeder_thread.join(timeout=10) self._segment_fetcher_thread.join(timeout=10)
self._last_segment_uris.clear() self._segment_data_feeder_thread.join(timeout=10)
del self._segment_queue self._stream_remuxer.stop()
del self._segment_data_queue self._stream_remuxer.raise_for_exception()
self._last_segment_uris.clear()
del self._segment_queue
del self._segment_data_queue
except TryAgain: except TryAgain:
pass pass
except Exception as e: except Exception as e:
self._handle_exception(e) self._handle_exception(e)
finally: finally:
with suppress(Exception): self._stopped = True
self._stream_processor_thread.join(timeout=10)
with suppress(Exception): with suppress(Exception):
self._playlist_debug_file.close() self._playlist_debug_file.close()
self._emit_event('stream_recording_stopped') self._emit_event('stream_recording_stopped')
@ -201,6 +208,8 @@ class HLSStreamRecorder(
except requests.exceptions.ConnectionError as e: except requests.exceptions.ConnectionError as e:
logger.warning(repr(e)) logger.warning(repr(e))
self._wait_for_connection_error() self._wait_for_connection_error()
except FailedToFetchSegments:
url = self._get_live_stream_url()
except RetryError as e: except RetryError as e:
logger.warning(repr(e)) logger.warning(repr(e))
@ -212,6 +221,14 @@ class HLSStreamRecorder(
self._stream_analysed = False self._stream_analysed = False
while not self._stopped: while not self._stopped:
if self._failed_to_fetch_segments.is_set():
with self._segment_queue.mutex:
self._segment_queue.queue.clear()
with self._ready_to_fetch_segments:
self._ready_to_fetch_segments.notify_all()
self._failed_to_fetch_segments.clear()
raise FailedToFetchSegments()
content = self._fetch_playlist(url) content = self._fetch_playlist(url)
playlist = m3u8.loads(content, uri=url) playlist = m3u8.loads(content, uri=url)
@ -252,8 +269,10 @@ class HLSStreamRecorder(
if playlist.is_endlist: if playlist.is_endlist:
logger.debug('playlist ended') logger.debug('playlist ended')
self._stopped = True self._run_coroutine(self._live.update_room_info())
break if not self._live.is_living():
self._stopped = True
break
time.sleep(1) time.sleep(1)
@ -272,6 +291,8 @@ class HLSStreamRecorder(
assert self._stream_remuxer is not None assert self._stream_remuxer is not None
init_section = None init_section = None
self._init_section_data = None self._init_section_data = None
num_of_continuously_failed = 0
self._failed_to_fetch_segments.clear()
while not self._stopped: while not self._stopped:
try: try:
@ -307,6 +328,13 @@ class HLSStreamRecorder(
except requests.exceptions.HTTPError as e: except requests.exceptions.HTTPError as e:
logger.warning(f'Failed to fetch segment: {repr(e)}') logger.warning(f'Failed to fetch segment: {repr(e)}')
if e.response.status_code in (403, 404, 599): if e.response.status_code in (403, 404, 599):
num_of_continuously_failed += 1
if num_of_continuously_failed >= 3:
self._failed_to_fetch_segments.set()
with self._ready_to_fetch_segments:
self._ready_to_fetch_segments.wait()
num_of_continuously_failed = 0
self._failed_to_fetch_segments.clear()
break break
except requests.exceptions.ConnectionError as e: except requests.exceptions.ConnectionError as e:
logger.warning(repr(e)) logger.warning(repr(e))
@ -315,6 +343,7 @@ class HLSStreamRecorder(
logger.warning(repr(e)) logger.warning(repr(e))
break break
else: else:
num_of_continuously_failed = 0
break break
def _run_segment_data_feeder(self) -> None: def _run_segment_data_feeder(self) -> None:
@ -329,6 +358,8 @@ class HLSStreamRecorder(
def _segment_data_feeder(self) -> None: def _segment_data_feeder(self) -> None:
assert self._stream_remuxer is not None assert self._stream_remuxer is not None
MAX_SEGMENT_DATA_CACHE = 3
segment_data_cache: List[bytes] = []
bytes_io = io.BytesIO() bytes_io = io.BytesIO()
segment_count = 0 segment_count = 0
@ -359,10 +390,41 @@ class HLSStreamRecorder(
bytes_io = io.BytesIO() bytes_io = io.BytesIO()
segment_count = 0 segment_count = 0
self._stream_analysed = True self._stream_analysed = True
try: try:
if self._stream_remuxer.stopped:
self._stream_remuxer.start()
while True:
ready = self._stream_remuxer.wait(timeout=1)
if self._stopped:
return
if ready:
break
if segment_data_cache:
if self._init_section_data:
self._stream_remuxer.input.write(
self._init_section_data
)
for cached_data in segment_data_cache:
if cached_data == self._init_section_data:
continue
self._stream_remuxer.input.write(cached_data)
self._stream_remuxer.input.write(data) self._stream_remuxer.input.write(data)
except BrokenPipeError: except BrokenPipeError as e:
return if not self._stopped:
logger.warning(repr(e))
else:
logger.debug(repr(e))
except ValueError as e:
if not self._stopped:
logger.warning(repr(e))
else:
logger.debug(repr(e))
segment_data_cache.append(data)
if len(segment_data_cache) > MAX_SEGMENT_DATA_CACHE:
segment_data_cache.pop(0)
def _run_stream_processor(self) -> None: def _run_stream_processor(self) -> None:
logger.debug('Stream processor thread started') logger.debug('Stream processor thread started')
@ -392,15 +454,44 @@ class HLSStreamRecorder(
self._stream_processor.size_updates.subscribe(update_size) self._stream_processor.size_updates.subscribe(update_size)
try: try:
self._stream_host_available.wait() while not self._stopped:
self._stream_processor.set_metadata(self._make_metadata()) while True:
self._stream_processor.process_stream( ready = self._stream_remuxer.wait(timeout=1)
StreamProxy(self._stream_remuxer.output), # type: ignore if self._stopped:
) return
if ready:
break
self._stream_host_available.wait()
self._stream_processor.set_metadata(self._make_metadata())
try:
self._stream_processor.process_stream(
StreamProxy(
self._stream_remuxer.output,
read_timeout=10,
) # type: ignore
)
except BrokenPipeError as e:
logger.debug(repr(e))
except TimeoutError as e:
logger.debug(repr(e))
self._stream_remuxer.stop()
except FlvDataError as e:
logger.warning(repr(e))
self._stream_remuxer.stop()
except FlvStreamCorruptedError as e:
logger.warning(repr(e))
self._stream_remuxer.stop()
except ValueError as e:
logger.warning(repr(e))
self._stream_remuxer.stop()
except Exception as e: except Exception as e:
if not self._stopped: if not self._stopped:
logger.exception(e) logger.exception(e)
self._handle_exception(e) self._handle_exception(e)
else:
logger.debug(repr(e))
finally: finally:
self._stream_processor.finalize() self._stream_processor.finalize()
self._progress_bar = None self._progress_bar = None

View File

@ -1,14 +1,16 @@
import re
import os import os
import io import io
import errno import errno
import shlex import shlex
import logging import logging
from threading import Thread, Event from threading import Thread, Condition
from subprocess import Popen, PIPE, CalledProcessError from subprocess import Popen, PIPE, CalledProcessError
from typing import List, Optional, cast from typing import Optional, cast
from ..utils.mixins import StoppableMixin, SupportDebugMixin from ..utils.mixins import StoppableMixin, SupportDebugMixin
from ..utils.io import wait_for
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -17,15 +19,21 @@ logger = logging.getLogger(__name__)
__all__ = 'StreamRemuxer', __all__ = 'StreamRemuxer',
class FFmpegError(Exception):
pass
class StreamRemuxer(StoppableMixin, SupportDebugMixin): class StreamRemuxer(StoppableMixin, SupportDebugMixin):
_ERROR_PATTERN = re.compile(
r'\b(error|failed|missing|invalid|corrupt)\b', re.IGNORECASE
)
def __init__(self, room_id: int, bufsize: int = 1024 * 1024) -> None: def __init__(self, room_id: int, bufsize: int = 1024 * 1024) -> None:
super().__init__() super().__init__()
self._room_id = room_id self._room_id = room_id
self._bufsize = bufsize self._bufsize = bufsize
self._exception: Optional[Exception] = None self._exception: Optional[Exception] = None
self._subprocess_setup = Event() self._ready = Condition()
self._MAX_ERROR_MESSAGES = 10
self._error_messages: List[str] = []
self._env = None self._env = None
self._init_for_debug(room_id) self._init_for_debug(room_id)
@ -50,15 +58,22 @@ class StreamRemuxer(StoppableMixin, SupportDebugMixin):
def __enter__(self): # type: ignore def __enter__(self): # type: ignore
self.start() self.start()
self.wait_for_subprocess() self.wait()
return self return self
def __exit__(self, exc_type, value, traceback): # type: ignore def __exit__(self, exc_type, value, traceback): # type: ignore
self.stop() self.stop()
self.raise_for_exception() self.raise_for_exception()
def wait_for_subprocess(self) -> None: def wait(self, timeout: Optional[float] = None) -> bool:
self._subprocess_setup.wait() with self._ready:
return self._ready.wait(timeout=timeout)
def restart(self) -> None:
logger.debug('Restarting stream remuxer...')
self.stop()
self.start()
logger.debug('Restarted stream remuxer')
def raise_for_exception(self) -> None: def raise_for_exception(self) -> None:
if not self.exception: if not self.exception:
@ -85,12 +100,17 @@ class StreamRemuxer(StoppableMixin, SupportDebugMixin):
def _run(self) -> None: def _run(self) -> None:
logger.debug('Started stream remuxer') logger.debug('Started stream remuxer')
self._exception = None self._exception = None
self._error_messages.clear()
self._subprocess_setup.clear()
try: try:
self._run_subprocess() self._run_subprocess()
except BrokenPipeError: except BrokenPipeError as e:
pass logger.debug(repr(e))
except FFmpegError as e:
if not self._stopped:
logger.warning(repr(e))
else:
logger.debug(repr(e))
except TimeoutError as e:
logger.debug(repr(e))
except Exception as e: except Exception as e:
# OSError: [Errno 22] Invalid argument # OSError: [Errno 22] Invalid argument
# https://stackoverflow.com/questions/23688492/oserror-errno-22-invalid-argument-in-subprocess # https://stackoverflow.com/questions/23688492/oserror-errno-22-invalid-argument-in-subprocess
@ -100,43 +120,43 @@ class StreamRemuxer(StoppableMixin, SupportDebugMixin):
self._exception = e self._exception = e
logger.exception(e) logger.exception(e)
finally: finally:
self._stopped = True
logger.debug('Stopped stream remuxer') logger.debug('Stopped stream remuxer')
def _run_subprocess(self) -> None: def _run_subprocess(self) -> None:
cmd = 'ffmpeg -i pipe:0 -c copy -f flv pipe:1' cmd = 'ffmpeg -xerror -i pipe:0 -c copy -copyts -f flv pipe:1'
args = shlex.split(cmd) args = shlex.split(cmd)
with Popen( with Popen(
args, stdin=PIPE, stdout=PIPE, stderr=PIPE, args, stdin=PIPE, stdout=PIPE, stderr=PIPE,
bufsize=self._bufsize, env=self._env, bufsize=self._bufsize, env=self._env,
) as self._subprocess: ) as self._subprocess:
self._subprocess_setup.set() with self._ready:
assert self._subprocess.stderr is not None self._ready.notify_all()
while not self._stopped: assert self._subprocess.stderr is not None
data = self._subprocess.stderr.readline() with io.TextIOWrapper(
if not data: self._subprocess.stderr,
if self._subprocess.poll() is not None: encoding='utf-8',
break errors='backslashreplace'
else: ) as stderr:
continue while not self._stopped:
line = data.decode('utf-8', errors='backslashreplace') line = wait_for(stderr.readline, timeout=10)
if self._debug: if not line:
logger.debug('ffmpeg: %s', line) if self._subprocess.poll() is not None:
self._check_error(line) break
else:
continue
if self._debug:
logger.debug('ffmpeg: %s', line)
self._check_error(line)
if not self._stopped and self._subprocess.returncode not in (0, 255): if not self._stopped and self._subprocess.returncode not in (0, 255):
# 255: Exiting normally, received signal 2. # 255: Exiting normally, received signal 2.
raise CalledProcessError( raise CalledProcessError(self._subprocess.returncode, cmd=cmd)
self._subprocess.returncode,
cmd=cmd,
output='\n'.join(self._error_messages),
)
def _check_error(self, line: str) -> None: def _check_error(self, line: str) -> None:
if 'error' not in line.lower() and 'failed' not in line.lower(): match = self._ERROR_PATTERN.search(line)
if not match:
return return
logger.warning(f'ffmpeg error: {line}') raise FFmpegError(line)
self._error_messages.append(line)
if len(self._error_messages) > self._MAX_ERROR_MESSAGES:
self._error_messages.remove(self._error_messages[0])

View File

@ -116,8 +116,8 @@ class FlvTagHeader:
filtered: bool filtered: bool
tag_type: TagType tag_type: TagType
data_size: int = attr.ib(validator=[non_negative_integer_validator]) data_size: int = attr.ib(validator=[non_negative_integer_validator])
timestamp: int = attr.ib(validator=[non_negative_integer_validator]) timestamp: int
stream_id: int = attr.ib(validator=[non_negative_integer_validator]) stream_id: int
@attr.s(auto_attribs=True, slots=True, frozen=True) @attr.s(auto_attribs=True, slots=True, frozen=True)

View File

@ -33,8 +33,7 @@ from .exceptions import (
from .common import ( from .common import (
is_metadata_tag, parse_metadata, is_audio_tag, is_video_tag, is_metadata_tag, parse_metadata, is_audio_tag, is_video_tag,
is_video_sequence_header, is_audio_sequence_header, is_video_sequence_header, is_audio_sequence_header,
is_audio_data_tag, is_video_data_tag, enrich_metadata, update_metadata, enrich_metadata, update_metadata, is_data_tag, read_tags_in_duration,
is_data_tag, read_tags_in_duration,
) )
from ..path import extra_metadata_path from ..path import extra_metadata_path
if TYPE_CHECKING: if TYPE_CHECKING:
@ -562,18 +561,26 @@ class StreamProcessor:
return header return header
def _ensure_ts_correct(self, tag: FlvTag) -> None: def _ensure_ts_correct(self, tag: FlvTag) -> None:
if not is_audio_data_tag(tag) and not is_video_data_tag(tag): if not tag.timestamp + self._delta < 0:
return return
if tag.timestamp + self._delta < 0: logger.warning(
f'Incorrect timestamp: {tag.timestamp + self._delta}\n'
f'last output tag: {self._last_tags[0]}\n'
f'current tag: {tag}'
)
if tag.is_audio_tag() or tag.is_video_tag():
self._delta = ( self._delta = (
self._last_tags[0].timestamp + self._last_tags[0].timestamp +
self._in_reader.calc_interval(tag) - tag.timestamp self._in_reader.calc_interval(tag) - tag.timestamp
) )
logger.warning( logger.debug(f'Updated delta: {self._delta}')
f'Incorrect timestamp, updated delta: {self._delta}\n' elif tag.is_script_tag():
f'last output tag: {self._last_tags[0]}\n' self._delta = (
f'current tag: {tag}' self._last_tags[0].timestamp - tag.timestamp
) )
logger.debug(f'Updated delta: {self._delta}')
else:
pass
def _correct_ts(self, tag: FlvTag, delta: int) -> FlvTag: def _correct_ts(self, tag: FlvTag, delta: int) -> FlvTag:
if delta == 0 and tag.timestamp >= 0: if delta == 0 and tag.timestamp >= 0:

29
src/blrec/utils/io.py Normal file
View File

@ -0,0 +1,29 @@
import atexit
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import TimeoutError as _TimeoutError
from typing import Callable, Any, Iterable, Mapping, TypeVar
_T = TypeVar('_T')
_executor = None
def wait_for(
func: Callable[..., _T],
*,
args: Iterable[Any] = [],
kwargs: Mapping[str, Any] = {},
timeout: float
) -> _T:
global _executor
if _executor is None:
_executor = ThreadPoolExecutor(thread_name_prefix='wait_for')
atexit.register(_executor.shutdown)
future = _executor.submit(func, *args, **kwargs)
try:
return future.result(timeout=timeout)
except _TimeoutError:
raise TimeoutError(timeout, func, args, kwargs) from None

View File

@ -1,6 +1,7 @@
import os import os
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
import asyncio import asyncio
import threading
from typing import Awaitable, TypeVar, final from typing import Awaitable, TypeVar, final
@ -8,24 +9,28 @@ class SwitchableMixin(ABC):
def __init__(self) -> None: def __init__(self) -> None:
super().__init__() super().__init__()
self._enabled = False self._enabled = False
self._enabled_lock = threading.Lock()
@property @property
def enabled(self) -> bool: def enabled(self) -> bool:
return self._enabled with self._enabled_lock:
return self._enabled
@final @final
def enable(self) -> None: def enable(self) -> None:
if self._enabled: with self._enabled_lock:
return if self._enabled:
self._enabled = True return
self._do_enable() self._enabled = True
self._do_enable()
@final @final
def disable(self) -> None: def disable(self) -> None:
if not self._enabled: with self._enabled_lock:
return if not self._enabled:
self._enabled = False return
self._do_disable() self._enabled = False
self._do_disable()
@abstractmethod @abstractmethod
def _do_enable(self) -> None: def _do_enable(self) -> None:
@ -40,24 +45,28 @@ class StoppableMixin(ABC):
def __init__(self) -> None: def __init__(self) -> None:
super().__init__() super().__init__()
self._stopped = True self._stopped = True
self._stopped_lock = threading.Lock()
@property @property
def stopped(self) -> bool: def stopped(self) -> bool:
return self._stopped with self._stopped_lock:
return self._stopped
@final @final
def start(self) -> None: def start(self) -> None:
if not self._stopped: with self._stopped_lock:
return if not self._stopped:
self._stopped = False return
self._do_start() self._stopped = False
self._do_start()
@final @final
def stop(self) -> None: def stop(self) -> None:
if self._stopped: with self._stopped_lock:
return if self._stopped:
self._stopped = True return
self._do_stop() self._stopped = True
self._do_stop()
@abstractmethod @abstractmethod
def _do_start(self) -> None: def _do_start(self) -> None:

View File

@ -6,6 +6,7 @@ from fastapi import FastAPI, status, Request, Depends
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles from fastapi.staticfiles import StaticFiles
from starlette.responses import Response
from pydantic import ValidationError from pydantic import ValidationError
from pkg_resources import resource_filename from pkg_resources import resource_filename
@ -143,6 +144,20 @@ class WebAppFiles(StaticFiles):
path = 'index.html' path = 'index.html'
return await super().lookup_path(path) return await super().lookup_path(path)
def file_response(self, full_path: str, *args, **kwargs) -> Response: # type: ignore # noqa
# ignore MIME types from Windows registry
# workaround for https://github.com/acgnhiki/blrec/issues/12
response = super().file_response(full_path, *args, **kwargs)
if full_path.endswith('.js'):
js_media_type = 'application/javascript'
if response.media_type != js_media_type:
response.media_type = js_media_type
headers = response.headers
headers['content-type'] = js_media_type
response.raw_headers = headers.raw
del response._headers
return response
directory = resource_filename(__name__, '../data/webapp') directory = resource_filename(__name__, '../data/webapp')
api.mount('/', WebAppFiles(directory=directory, html=True), name='webapp') api.mount('/', WebAppFiles(directory=directory, html=True), name='webapp')