refactor: refactor danmaku client

This commit is contained in:
acgnhik 2022-06-05 11:52:52 +08:00
parent 5229141d02
commit 3e6c5d92a4
2 changed files with 44 additions and 27 deletions

View File

@ -9,13 +9,14 @@ from typing import Any, Dict, Final, List, Optional, Tuple, Union, cast
import aiohttp
import brotli
from aiohttp import ClientSession
from tenacity import retry, retry_if_exception_type, wait_exponential
from tenacity import retry, retry_if_exception_type, stop_after_delay, wait_exponential
from ..event.event_emitter import EventEmitter, EventListener
from ..exception import exception_callback
from ..logging.room_id import aio_task_with_room_id
from ..utils.mixins import AsyncStoppableMixin
from .api import AppApi, WebApi
from .exceptions import DanmakuClientAuthError
from .typing import ApiPlatform, Danmaku
__all__ = 'DanmakuClient', 'DanmakuListener', 'Danmaku', 'DanmakuCommand'
@ -85,7 +86,7 @@ class DanmakuClient(EventEmitter[DanmakuListener], AsyncStoppableMixin):
await self._emit('client_reconnected')
@retry(
wait=wait_exponential(multiplier=0.1, max=60),
wait=wait_exponential(multiplier=0.1, max=10),
retry=retry_if_exception_type((asyncio.TimeoutError, aiohttp.ClientError)),
)
async def _connect(self) -> None:
@ -96,8 +97,11 @@ class DanmakuClient(EventEmitter[DanmakuListener], AsyncStoppableMixin):
reply = await self._recieve_auth_reply()
await self._handle_auth_reply(reply)
except Exception:
self._rotate_api_platform()
await self._update_danmu_info()
self._host_index += 1
if self._host_index >= len(self._danmu_info['host_list']):
self._host_index = 0
self._rotate_api_platform()
await self._update_danmu_info()
raise
else:
logger.debug('Connected to server')
@ -113,8 +117,6 @@ class DanmakuClient(EventEmitter[DanmakuListener], AsyncStoppableMixin):
self._ws = await self.session.ws_connect(url, timeout=5)
except Exception as exc:
logger.debug(f'Failed to connect WebSocket: {repr(exc)}')
host_count = len(self._danmu_info['host_list'])
self._host_index = (self._host_index + 1) % host_count
raise
else:
logger.debug('Connected WebSocket')
@ -163,11 +165,9 @@ class DanmakuClient(EventEmitter[DanmakuListener], AsyncStoppableMixin):
logger.debug('Auth OK')
self._create_heartbeat_task()
elif code == WS.AUTH_TOKEN_ERROR:
logger.debug('Token expired, will try to reconnect.')
await self._update_danmu_info()
raise ValueError(f'Token expired: {code}')
raise DanmakuClientAuthError(f'Token expired: {code}')
else:
raise ValueError(f'Unexpected code: {code}')
raise DanmakuClientAuthError(f'Unexpected code: {code}')
def _rotate_api_platform(self) -> None:
if self._api_platform == 'android':
@ -175,13 +175,24 @@ class DanmakuClient(EventEmitter[DanmakuListener], AsyncStoppableMixin):
else:
self._api_platform = 'android'
@retry(
retry=retry_if_exception_type((asyncio.TimeoutError, aiohttp.ClientError)),
wait=wait_exponential(max=10),
stop=stop_after_delay(60),
)
async def _update_danmu_info(self) -> None:
logger.debug(f'Updating danmu info via {self._api_platform} api...')
api: Union[WebApi, AppApi]
if self._api_platform == 'web':
self._danmu_info = await self.webapi.get_danmu_info(self._room_id)
api = self.webapi
else:
self._danmu_info = await self.appapi.get_danmu_info(self._room_id)
logger.debug('Danmu info updated')
api = self.appapi
try:
self._danmu_info = await api.get_danmu_info(self._room_id)
except Exception as exc:
logger.warning(f'Failed to update danmu info: {repr(exc)}')
else:
logger.debug('Danmu info updated')
async def _disconnect(self) -> None:
await self._cancel_heartbeat_task()
@ -232,25 +243,27 @@ class DanmakuClient(EventEmitter[DanmakuListener], AsyncStoppableMixin):
async def _dispatch_message(self, msg: Dict[str, Any]) -> None:
await self._emit('danmaku_received', msg)
@retry(retry=retry_if_exception_type((asyncio.TimeoutError,)))
async def _receive(self) -> List[Dict[str, Any]]:
self._retry_count = 0
self._retry_delay = 0
while True:
wsmsg = await self._ws.receive(timeout=self._HEARTBEAT_INTERVAL)
if wsmsg.type == aiohttp.WSMsgType.BINARY:
if result := await self._handle_data(wsmsg.data):
return result
elif wsmsg.type == aiohttp.WSMsgType.ERROR:
await self._handle_error(cast(Exception, wsmsg.data))
elif wsmsg.type == aiohttp.WSMsgType.CLOSED:
msg = 'WebSocket Closed'
exc = aiohttp.WebSocketError(self._ws.close_code or 1006, msg)
await self._handle_error(exc)
try:
wsmsg = await self._ws.receive(timeout=self._HEARTBEAT_INTERVAL)
except Exception as e:
await self._handle_error(e)
else:
raise ValueError(wsmsg)
if wsmsg.type == aiohttp.WSMsgType.BINARY:
if result := await self._handle_data(wsmsg.data):
return result
elif wsmsg.type == aiohttp.WSMsgType.ERROR:
await self._handle_error(cast(Exception, wsmsg.data))
elif wsmsg.type == aiohttp.WSMsgType.CLOSED:
msg = 'WebSocket Closed'
exc = aiohttp.WebSocketError(self._ws.close_code or 1006, msg)
await self._handle_error(exc)
else:
await self._handle_error(ValueError(wsmsg))
@staticmethod
async def _handle_data(data: bytes) -> Optional[List[Dict[str, Any]]]:

View File

@ -1,5 +1,5 @@
import attr
import aiohttp
@attr.s(auto_attribs=True, frozen=True, slots=True)
@ -8,6 +8,10 @@ class ApiRequestError(Exception):
message: str
class DanmakuClientAuthError(aiohttp.ClientError):
pass
class LiveRoomHidden(Exception):
pass