refactor: refactor danmaku client

This commit is contained in:
acgnhik 2022-12-04 14:29:07 +08:00
parent 46811d4677
commit 08130d5e61

View File

@ -52,7 +52,7 @@ class DanmakuClient(EventEmitter[DanmakuListener], AsyncStoppableMixin):
webapi: WebApi,
room_id: int,
*,
max_retries: int = 10,
max_retries: int = 60,
headers: Optional[Dict[str, str]] = None,
) -> None:
super().__init__()
@ -237,7 +237,11 @@ class DanmakuClient(EventEmitter[DanmakuListener], AsyncStoppableMixin):
try:
await self._ws.send_bytes(data)
except Exception as exc:
logger.debug(f'Failed to send heartbeat due to: {repr(exc)}')
logger.warning(f'Failed to send heartbeat: {repr(exc)}')
await self._emit('error_occurred', exc)
task = asyncio.create_task(self.restart())
task.add_done_callback(exception_callback)
break
await asyncio.sleep(self._HEARTBEAT_INTERVAL)
async def _create_message_loop(self) -> None:
@ -261,48 +265,53 @@ class DanmakuClient(EventEmitter[DanmakuListener], AsyncStoppableMixin):
await self._emit('danmaku_received', msg)
async def _receive(self) -> List[Dict[str, Any]]:
self._retry_count = 0
self._retry_delay = 0
self._reset_retry()
while True:
try:
wsmsg = await self._ws.receive(timeout=self._HEARTBEAT_INTERVAL)
except asyncio.TimeoutError as e:
logger.debug(f'Failed to receive message due to: {repr(e)}')
continue
wsmsg = await self._ws.receive(timeout=self._HEARTBEAT_INTERVAL * 2)
except Exception as e:
await self._handle_error(e)
await self._handle_receive_error(e)
else:
if wsmsg.type == aiohttp.WSMsgType.BINARY:
if result := await self._handle_data(wsmsg.data):
return result
elif wsmsg.type == aiohttp.WSMsgType.ERROR:
await self._handle_error(cast(Exception, wsmsg.data))
await self._handle_receive_error(cast(Exception, wsmsg.data))
elif wsmsg.type == aiohttp.WSMsgType.CLOSED:
msg = 'WebSocket Closed'
exc = aiohttp.WebSocketError(self._ws.close_code or 1006, msg)
await self._handle_error(exc)
await self._handle_receive_error(exc)
else:
await self._handle_error(ValueError(wsmsg))
await self._handle_receive_error(ValueError(wsmsg))
@staticmethod
async def _handle_data(data: bytes) -> Optional[List[Dict[str, Any]]]:
loop = asyncio.get_running_loop()
op, msg = await loop.run_in_executor(None, Frame.decode, data)
if op == WS.OP_MESSAGE:
msg = cast(List[str], msg)
return [json.loads(m) for m in msg]
elif op == WS.OP_HEARTBEAT_REPLY:
return None
else:
return None
try:
op, msg = await loop.run_in_executor(None, Frame.decode, data)
if op == WS.OP_MESSAGE:
msg = cast(List[str], msg)
return [json.loads(m) for m in msg]
elif op == WS.OP_HEARTBEAT_REPLY:
pass
except Exception as e:
logger.warning(f'Failed to handle data: {repr(e)}, data: {repr(data)}')
async def _handle_error(self, exc: Exception) -> None:
logger.debug(f'Failed to receive message due to: {repr(exc)}')
return None
async def _handle_receive_error(self, exc: Exception) -> None:
logger.warning(f'Failed to receive message: {repr(exc)}')
await self._emit('error_occurred', exc)
if isinstance(exc, asyncio.TimeoutError):
return
await self._retry()
def _reset_retry(self) -> None:
self._retry_count = 0
self._retry_delay = 0
async def _retry(self) -> None:
if self._retry_count < self._MAX_RETRIES:
if self._retry_delay > 0: