refactor: improve robustness

This commit is contained in:
acgnhik 2022-06-18 16:03:04 +08:00
parent 54cf481931
commit 7de0e5e4be
7 changed files with 113 additions and 118 deletions

View File

@ -9,7 +9,7 @@ from typing import Any, Dict, Final, List, Optional, Tuple, Union, cast
import aiohttp
import brotli
from aiohttp import ClientSession
from tenacity import retry, retry_if_exception_type, stop_after_delay, wait_exponential
from tenacity import retry, retry_if_exception_type, wait_exponential
from ..event.event_emitter import EventEmitter, EventListener
from ..exception import exception_callback
@ -61,6 +61,7 @@ class DanmakuClient(EventEmitter[DanmakuListener], AsyncStoppableMixin):
self._room_id = room_id
self._api_platform: ApiPlatform = 'web'
self._danmu_info: Dict[str, Any] = COMMON_DANMU_INFO
self._host_index: int = 0
self._retry_delay: int = 0
self._MAX_RETRIES: Final[int] = max_retries
@ -177,11 +178,6 @@ class DanmakuClient(EventEmitter[DanmakuListener], AsyncStoppableMixin):
else:
self._api_platform = 'android'
@retry(
retry=retry_if_exception_type((asyncio.TimeoutError, aiohttp.ClientError)),
wait=wait_exponential(max=10),
stop=stop_after_delay(60),
)
async def _update_danmu_info(self) -> None:
logger.debug(f'Updating danmu info via {self._api_platform} api...')
api: Union[WebApi, AppApi]
@ -193,7 +189,7 @@ class DanmakuClient(EventEmitter[DanmakuListener], AsyncStoppableMixin):
self._danmu_info = await api.get_danmu_info(self._room_id)
except Exception as exc:
logger.warning(f'Failed to update danmu info: {repr(exc)}')
raise
self._danmu_info = COMMON_DANMU_INFO
else:
logger.debug('Danmu info updated')
@ -460,3 +456,16 @@ class DanmakuCommand(Enum):
WISH_BOTTLE = 'WISH_BOTTLE'
GUARD_BUY = 'GUARD_BUY'
# ...
COMMON_DANMU_INFO: Final[Dict[str, Any]] = {
"token": "",
"host_list": [
{
"host": "broadcastlv.chat.bilibili.com",
"port": 2243,
"wss_port": 443,
"ws_port": 2244,
}
],
}

View File

@ -1,8 +1,8 @@
import asyncio
import time
import json
import logging
import re
import time
from typing import Dict, List, cast
import aiohttp
@ -130,17 +130,40 @@ class Live:
else:
return True
async def update_info(self) -> None:
await asyncio.wait([self.update_user_info(), self.update_room_info()])
async def update_info(self, raise_exception: bool = False) -> bool:
return all(
await asyncio.gather(
self.update_user_info(raise_exception=raise_exception),
self.update_room_info(raise_exception=raise_exception),
)
)
async def update_user_info(self) -> None:
self._user_info = await self.get_user_info(self._room_info.uid)
async def update_user_info(self, raise_exception: bool = False) -> bool:
try:
self._user_info = await self.get_user_info(self._room_info.uid)
except Exception as e:
logger.error(f'Failed to update user info: {repr(e)}')
if raise_exception:
raise
return False
else:
return True
async def update_room_info(self) -> None:
self._room_info = await self.get_room_info()
async def update_room_info(self, raise_exception: bool = False) -> bool:
try:
self._room_info = await self.get_room_info()
except Exception as e:
logger.error(f'Failed to update room info: {repr(e)}')
if raise_exception:
raise
return False
else:
return True
@retry(
retry=retry_if_exception_type((asyncio.TimeoutError, aiohttp.ClientError)),
retry=retry_if_exception_type(
(asyncio.TimeoutError, aiohttp.ClientError, ValueError)
),
wait=wait_exponential(max=10),
stop=stop_after_delay(60),
)
@ -278,8 +301,9 @@ class Live:
async with self._session.get(self._html_page_url) as response:
data = await response.read()
m = _INFO_PATTERN.search(data)
assert m is not None, data
match = _INFO_PATTERN.search(data)
if not match:
raise ValueError('Can not extract info from html page')
string = m.group(1).decode(encoding='utf8')
string = match.group(1).decode(encoding='utf8')
return json.loads(string)

View File

@ -68,7 +68,7 @@ class LiveMonitor(
# such as an operating system hibernation.
logger.warning('The Danmaku Client Reconnected')
await self._live.update_info()
await self._live.update_room_info()
current_status = self._live.room_info.live_status
if current_status == self._previous_status:
@ -104,7 +104,7 @@ class LiveMonitor(
self._previous_status.name, current_status.name
))
await self._live.update_info()
await self._live.update_room_info()
if (s := self._live.room_info.live_status) != current_status:
logger.warning(
'Updated live status {} is inconsistent with '

View File

@ -70,7 +70,7 @@ class CoverDownloader(StreamRecorderEventListener, SwitchableMixin):
@aio_task_with_room_id
async def _save_cover(self, video_path: str) -> None:
try:
await self._live.update_info()
await self._live.update_room_info()
cover_url = self._live.room_info.cover
data = await self._fetch_cover(cover_url)
sha1 = sha1sum(data)

View File

@ -403,10 +403,6 @@ class JoinPointExtractor:
script_data = parse_scriptdata(join_point_tag)
join_point_data = cast(JoinPointData, script_data['value'])
assert next_tag.body is not None, next_tag
assert cksum(next_tag.body) == join_point_data['crc32'], (
join_point_tag,
next_tag,
)
join_point = JoinPoint(
seamless=join_point_data['seamless'],
timestamp=next_tag.timestamp,

View File

@ -1,35 +1,35 @@
import os
import logging
import os
from pathlib import Path
from typing import Iterator, Optional
from .models import (
TaskStatus,
RunningStatus,
VideoFileStatus,
VideoFileDetail,
DanmukuFileStatus,
DanmakuFileDetail,
)
from ..bili.live import Live
from ..bili.models import RoomInfo, UserInfo
from ..bili.danmaku_client import DanmakuClient
from ..bili.live import Live
from ..bili.live_monitor import LiveMonitor
from ..bili.typing import StreamFormat, QualityNumber
from ..bili.models import RoomInfo, UserInfo
from ..bili.typing import QualityNumber, StreamFormat
from ..core import Recorder
from ..flv.operators import MetaData, StreamProfile
from ..core.cover_downloader import CoverSaveStrategy
from ..postprocess import Postprocessor, PostprocessorStatus, DeleteStrategy
from ..postprocess.remux import RemuxingProgress
from ..flv.metadata_injection import InjectingProgress
from ..event.event_submitters import (
LiveEventSubmitter, RecorderEventSubmitter, PostprocessorEventSubmitter
LiveEventSubmitter,
PostprocessorEventSubmitter,
RecorderEventSubmitter,
)
from ..flv.metadata_injection import InjectingProgress
from ..flv.operators import MetaData, StreamProfile
from ..logging.room_id import aio_task_with_room_id
from ..postprocess import DeleteStrategy, Postprocessor, PostprocessorStatus
from ..postprocess.remux import RemuxingProgress
from .models import (
DanmakuFileDetail,
DanmukuFileStatus,
RunningStatus,
TaskStatus,
VideoFileDetail,
VideoFileStatus,
)
__all__ = 'RecordTask',
__all__ = ('RecordTask',)
logger = logging.getLogger(__name__)
@ -118,9 +118,7 @@ class RecordTask:
recording_path=self.recording_path,
postprocessor_status=self._postprocessor.status,
postprocessing_path=self._postprocessor.postprocessing_path,
postprocessing_progress=(
self._postprocessor.postprocessing_progress
),
postprocessing_progress=(self._postprocessor.postprocessing_progress),
)
@property
@ -163,11 +161,7 @@ class RecordTask:
# disabling recorder by force or stoping task by force
status = VideoFileStatus.BROKEN
yield VideoFileDetail(
path=path,
size=size,
status=status,
)
yield VideoFileDetail(path=path, size=size, status=status)
@property
def danmaku_file_details(self) -> Iterator[DanmakuFileDetail]:
@ -192,11 +186,7 @@ class RecordTask:
# disabling recorder by force or stoping task by force
status = DanmukuFileStatus.BROKEN
yield DanmakuFileDetail(
path=path,
size=size,
status=status,
)
yield DanmakuFileDetail(path=path, size=size, status=status)
@property
def user_agent(self) -> str:
@ -460,8 +450,8 @@ class RecordTask:
await self._recorder.stop()
await self._postprocessor.stop()
async def update_info(self) -> None:
await self._live.update_info()
async def update_info(self, raise_exception: bool = False) -> bool:
return await self._live.update_info(raise_exception=raise_exception)
@aio_task_with_room_id
async def update_session(self) -> None:
@ -488,10 +478,7 @@ class RecordTask:
def _setup_danmaku_client(self) -> None:
self._danmaku_client = DanmakuClient(
self._live.session,
self._live.appapi,
self._live.webapi,
self._live.room_id
self._live.session, self._live.appapi, self._live.webapi, self._live.room_id
)
def _setup_live_monitor(self) -> None:
@ -522,8 +509,9 @@ class RecordTask:
)
def _setup_postprocessor_event_submitter(self) -> None:
self._postprocessor_event_submitter = \
PostprocessorEventSubmitter(self._postprocessor)
self._postprocessor_event_submitter = PostprocessorEventSubmitter(
self._postprocessor
)
async def _destroy(self) -> None:
self._destroy_postprocessor_event_submitter()

View File

@ -1,34 +1,31 @@
from __future__ import annotations
import asyncio
import logging
from typing import Dict, Iterator, Optional, TYPE_CHECKING
from typing import TYPE_CHECKING, Dict, Iterator, Optional
import aiohttp
from tenacity import (
retry,
wait_exponential,
stop_after_delay,
retry_if_exception_type,
)
from tenacity import retry, retry_if_exception_type, stop_after_delay, wait_exponential
from .task import RecordTask
from .models import TaskData, TaskParam, VideoFileDetail, DanmakuFileDetail
from ..flv.operators import MetaData, StreamProfile
from ..exception import submit_exception, NotFoundError
from ..bili.exceptions import ApiRequestError
from ..exception import NotFoundError, submit_exception
from ..flv.operators import MetaData, StreamProfile
from .models import DanmakuFileDetail, TaskData, TaskParam, VideoFileDetail
from .task import RecordTask
if TYPE_CHECKING:
from ..setting import SettingsManager
from ..setting import (
HeaderSettings,
DanmakuSettings,
RecorderSettings,
PostprocessingSettings,
TaskSettings,
HeaderSettings,
OutputSettings,
PostprocessingSettings,
RecorderSettings,
TaskSettings,
)
__all__ = 'RecordTaskManager',
__all__ = ('RecordTaskManager',)
logger = logging.getLogger(__name__)
@ -57,9 +54,7 @@ class RecordTaskManager:
logger.info('Destroying all tasks...')
if not self._tasks:
return
await asyncio.wait([
t.destroy() for t in self._tasks.values() if t.ready
])
await asyncio.wait([t.destroy() for t in self._tasks.values() if t.ready])
self._tasks.clear()
logger.info('Successfully destroyed all task')
@ -68,9 +63,9 @@ class RecordTaskManager:
@retry(
reraise=True,
retry=retry_if_exception_type((
asyncio.TimeoutError, aiohttp.ClientError, ApiRequestError,
)),
retry=retry_if_exception_type(
(asyncio.TimeoutError, aiohttp.ClientError, ApiRequestError)
),
wait=wait_exponential(max=10),
stop=stop_after_delay(60),
)
@ -104,9 +99,7 @@ class RecordTaskManager:
if settings.enable_recorder:
await task.enable_recorder()
except Exception as e:
logger.error(
f'Failed to add task {settings.room_id} due to: {repr(e)}'
)
logger.error(f'Failed to add task {settings.room_id} due to: {repr(e)}')
del self._tasks[settings.room_id]
raise
@ -120,9 +113,7 @@ class RecordTaskManager:
del self._tasks[room_id]
async def remove_all_tasks(self) -> None:
coros = [
self.remove_task(i) for i, t in self._tasks.items() if t.ready
]
coros = [self.remove_task(i) for i, t in self._tasks.items() if t.ready]
if coros:
await asyncio.wait(coros)
@ -168,9 +159,7 @@ class RecordTaskManager:
task = self._get_task(room_id, check_ready=True)
await task.enable_recorder()
async def disable_task_recorder(
self, room_id: int, force: bool = False
) -> None:
async def disable_task_recorder(self, room_id: int, force: bool = False) -> None:
task = self._get_task(room_id, check_ready=True)
await task.disable_recorder(force)
@ -180,9 +169,7 @@ class RecordTaskManager:
await asyncio.wait(coros)
async def disable_all_task_recorders(self, force: bool = False) -> None:
coros = [
t.disable_recorder(force) for t in self._tasks.values() if t.ready
]
coros = [t.disable_recorder(force) for t in self._tasks.values() if t.ready]
if coros:
await asyncio.wait(coros)
@ -206,9 +193,7 @@ class RecordTaskManager:
task = self._get_task(room_id, check_ready=True)
return task.stream_profile
def get_task_video_file_details(
self, room_id: int
) -> Iterator[VideoFileDetail]:
def get_task_video_file_details(self, room_id: int) -> Iterator[VideoFileDetail]:
task = self._get_task(room_id, check_ready=True)
yield from task.video_file_details
@ -228,27 +213,22 @@ class RecordTaskManager:
async def update_task_info(self, room_id: int) -> None:
task = self._get_task(room_id, check_ready=True)
await task.update_info()
await task.update_info(raise_exception=True)
async def update_all_task_infos(self) -> None:
coros = [t.update_info() for t in self._tasks.values() if t.ready]
coros = [
t.update_info(raise_exception=True) for t in self._tasks.values() if t.ready
]
if coros:
await asyncio.wait(coros)
async def apply_task_header_settings(
self,
room_id: int,
settings: HeaderSettings,
*,
update_session: bool = True,
self, room_id: int, settings: HeaderSettings, *, update_session: bool = True
) -> None:
task = self._get_task(room_id)
# avoid unnecessary updates that will interrupt connections
if (
task.user_agent == settings.user_agent and
task.cookie == settings.cookie
):
if task.user_agent == settings.user_agent and task.cookie == settings.cookie:
return
task.user_agent = settings.user_agent
@ -338,7 +318,5 @@ class RecordTaskManager:
def _make_task_data(self, task: RecordTask) -> TaskData:
return TaskData(
user_info=task.user_info,
room_info=task.room_info,
task_status=task.status,
user_info=task.user_info, room_info=task.room_info, task_status=task.status
)