From 3e6c5d92a456b84e942cd1eae81223f037a2d259 Mon Sep 17 00:00:00 2001 From: acgnhik Date: Sun, 5 Jun 2022 11:52:52 +0800 Subject: [PATCH] refactor: refactor danmaku client --- src/blrec/bili/danmaku_client.py | 65 +++++++++++++++++++------------- src/blrec/bili/exceptions.py | 6 ++- 2 files changed, 44 insertions(+), 27 deletions(-) diff --git a/src/blrec/bili/danmaku_client.py b/src/blrec/bili/danmaku_client.py index 1b3e65f..f44bc38 100644 --- a/src/blrec/bili/danmaku_client.py +++ b/src/blrec/bili/danmaku_client.py @@ -9,13 +9,14 @@ from typing import Any, Dict, Final, List, Optional, Tuple, Union, cast import aiohttp import brotli from aiohttp import ClientSession -from tenacity import retry, retry_if_exception_type, wait_exponential +from tenacity import retry, retry_if_exception_type, stop_after_delay, wait_exponential from ..event.event_emitter import EventEmitter, EventListener from ..exception import exception_callback from ..logging.room_id import aio_task_with_room_id from ..utils.mixins import AsyncStoppableMixin from .api import AppApi, WebApi +from .exceptions import DanmakuClientAuthError from .typing import ApiPlatform, Danmaku __all__ = 'DanmakuClient', 'DanmakuListener', 'Danmaku', 'DanmakuCommand' @@ -85,7 +86,7 @@ class DanmakuClient(EventEmitter[DanmakuListener], AsyncStoppableMixin): await self._emit('client_reconnected') @retry( - wait=wait_exponential(multiplier=0.1, max=60), + wait=wait_exponential(multiplier=0.1, max=10), retry=retry_if_exception_type((asyncio.TimeoutError, aiohttp.ClientError)), ) async def _connect(self) -> None: @@ -96,8 +97,11 @@ class DanmakuClient(EventEmitter[DanmakuListener], AsyncStoppableMixin): reply = await self._recieve_auth_reply() await self._handle_auth_reply(reply) except Exception: - self._rotate_api_platform() - await self._update_danmu_info() + self._host_index += 1 + if self._host_index >= len(self._danmu_info['host_list']): + self._host_index = 0 + self._rotate_api_platform() + await self._update_danmu_info() raise else: logger.debug('Connected to server') @@ -113,8 +117,6 @@ class DanmakuClient(EventEmitter[DanmakuListener], AsyncStoppableMixin): self._ws = await self.session.ws_connect(url, timeout=5) except Exception as exc: logger.debug(f'Failed to connect WebSocket: {repr(exc)}') - host_count = len(self._danmu_info['host_list']) - self._host_index = (self._host_index + 1) % host_count raise else: logger.debug('Connected WebSocket') @@ -163,11 +165,9 @@ class DanmakuClient(EventEmitter[DanmakuListener], AsyncStoppableMixin): logger.debug('Auth OK') self._create_heartbeat_task() elif code == WS.AUTH_TOKEN_ERROR: - logger.debug('Token expired, will try to reconnect.') - await self._update_danmu_info() - raise ValueError(f'Token expired: {code}') + raise DanmakuClientAuthError(f'Token expired: {code}') else: - raise ValueError(f'Unexpected code: {code}') + raise DanmakuClientAuthError(f'Unexpected code: {code}') def _rotate_api_platform(self) -> None: if self._api_platform == 'android': @@ -175,13 +175,24 @@ class DanmakuClient(EventEmitter[DanmakuListener], AsyncStoppableMixin): else: self._api_platform = 'android' + @retry( + retry=retry_if_exception_type((asyncio.TimeoutError, aiohttp.ClientError)), + wait=wait_exponential(max=10), + stop=stop_after_delay(60), + ) async def _update_danmu_info(self) -> None: logger.debug(f'Updating danmu info via {self._api_platform} api...') + api: Union[WebApi, AppApi] if self._api_platform == 'web': - self._danmu_info = await self.webapi.get_danmu_info(self._room_id) + api = self.webapi else: - self._danmu_info = await self.appapi.get_danmu_info(self._room_id) - logger.debug('Danmu info updated') + api = self.appapi + try: + self._danmu_info = await api.get_danmu_info(self._room_id) + except Exception as exc: + logger.warning(f'Failed to update danmu info: {repr(exc)}') + else: + logger.debug('Danmu info updated') async def _disconnect(self) -> None: await self._cancel_heartbeat_task() @@ -232,25 +243,27 @@ class DanmakuClient(EventEmitter[DanmakuListener], AsyncStoppableMixin): async def _dispatch_message(self, msg: Dict[str, Any]) -> None: await self._emit('danmaku_received', msg) - @retry(retry=retry_if_exception_type((asyncio.TimeoutError,))) async def _receive(self) -> List[Dict[str, Any]]: self._retry_count = 0 self._retry_delay = 0 while True: - wsmsg = await self._ws.receive(timeout=self._HEARTBEAT_INTERVAL) - - if wsmsg.type == aiohttp.WSMsgType.BINARY: - if result := await self._handle_data(wsmsg.data): - return result - elif wsmsg.type == aiohttp.WSMsgType.ERROR: - await self._handle_error(cast(Exception, wsmsg.data)) - elif wsmsg.type == aiohttp.WSMsgType.CLOSED: - msg = 'WebSocket Closed' - exc = aiohttp.WebSocketError(self._ws.close_code or 1006, msg) - await self._handle_error(exc) + try: + wsmsg = await self._ws.receive(timeout=self._HEARTBEAT_INTERVAL) + except Exception as e: + await self._handle_error(e) else: - raise ValueError(wsmsg) + if wsmsg.type == aiohttp.WSMsgType.BINARY: + if result := await self._handle_data(wsmsg.data): + return result + elif wsmsg.type == aiohttp.WSMsgType.ERROR: + await self._handle_error(cast(Exception, wsmsg.data)) + elif wsmsg.type == aiohttp.WSMsgType.CLOSED: + msg = 'WebSocket Closed' + exc = aiohttp.WebSocketError(self._ws.close_code or 1006, msg) + await self._handle_error(exc) + else: + await self._handle_error(ValueError(wsmsg)) @staticmethod async def _handle_data(data: bytes) -> Optional[List[Dict[str, Any]]]: diff --git a/src/blrec/bili/exceptions.py b/src/blrec/bili/exceptions.py index e1201e0..c4844e4 100644 --- a/src/blrec/bili/exceptions.py +++ b/src/blrec/bili/exceptions.py @@ -1,5 +1,5 @@ - import attr +import aiohttp @attr.s(auto_attribs=True, frozen=True, slots=True) @@ -8,6 +8,10 @@ class ApiRequestError(Exception): message: str +class DanmakuClientAuthError(aiohttp.ClientError): + pass + + class LiveRoomHidden(Exception): pass