diff --git a/src/blrec/bili/danmaku_client.py b/src/blrec/bili/danmaku_client.py index a8673d3..6ec47aa 100644 --- a/src/blrec/bili/danmaku_client.py +++ b/src/blrec/bili/danmaku_client.py @@ -52,7 +52,7 @@ class DanmakuClient(EventEmitter[DanmakuListener], AsyncStoppableMixin): webapi: WebApi, room_id: int, *, - max_retries: int = 10, + max_retries: int = 60, headers: Optional[Dict[str, str]] = None, ) -> None: super().__init__() @@ -237,7 +237,11 @@ class DanmakuClient(EventEmitter[DanmakuListener], AsyncStoppableMixin): try: await self._ws.send_bytes(data) except Exception as exc: - logger.debug(f'Failed to send heartbeat due to: {repr(exc)}') + logger.warning(f'Failed to send heartbeat: {repr(exc)}') + await self._emit('error_occurred', exc) + task = asyncio.create_task(self.restart()) + task.add_done_callback(exception_callback) + break await asyncio.sleep(self._HEARTBEAT_INTERVAL) async def _create_message_loop(self) -> None: @@ -261,48 +265,53 @@ class DanmakuClient(EventEmitter[DanmakuListener], AsyncStoppableMixin): await self._emit('danmaku_received', msg) async def _receive(self) -> List[Dict[str, Any]]: - self._retry_count = 0 - self._retry_delay = 0 + self._reset_retry() while True: try: - wsmsg = await self._ws.receive(timeout=self._HEARTBEAT_INTERVAL) - except asyncio.TimeoutError as e: - logger.debug(f'Failed to receive message due to: {repr(e)}') - continue + wsmsg = await self._ws.receive(timeout=self._HEARTBEAT_INTERVAL * 2) except Exception as e: - await self._handle_error(e) + await self._handle_receive_error(e) else: if wsmsg.type == aiohttp.WSMsgType.BINARY: if result := await self._handle_data(wsmsg.data): return result elif wsmsg.type == aiohttp.WSMsgType.ERROR: - await self._handle_error(cast(Exception, wsmsg.data)) + await self._handle_receive_error(cast(Exception, wsmsg.data)) elif wsmsg.type == aiohttp.WSMsgType.CLOSED: msg = 'WebSocket Closed' exc = aiohttp.WebSocketError(self._ws.close_code or 1006, msg) - await self._handle_error(exc) + await self._handle_receive_error(exc) else: - await self._handle_error(ValueError(wsmsg)) + await self._handle_receive_error(ValueError(wsmsg)) @staticmethod async def _handle_data(data: bytes) -> Optional[List[Dict[str, Any]]]: loop = asyncio.get_running_loop() - op, msg = await loop.run_in_executor(None, Frame.decode, data) - if op == WS.OP_MESSAGE: - msg = cast(List[str], msg) - return [json.loads(m) for m in msg] - elif op == WS.OP_HEARTBEAT_REPLY: - return None - else: - return None + try: + op, msg = await loop.run_in_executor(None, Frame.decode, data) + if op == WS.OP_MESSAGE: + msg = cast(List[str], msg) + return [json.loads(m) for m in msg] + elif op == WS.OP_HEARTBEAT_REPLY: + pass + except Exception as e: + logger.warning(f'Failed to handle data: {repr(e)}, data: {repr(data)}') - async def _handle_error(self, exc: Exception) -> None: - logger.debug(f'Failed to receive message due to: {repr(exc)}') + return None + + async def _handle_receive_error(self, exc: Exception) -> None: + logger.warning(f'Failed to receive message: {repr(exc)}') await self._emit('error_occurred', exc) + if isinstance(exc, asyncio.TimeoutError): + return await self._retry() + def _reset_retry(self) -> None: + self._retry_count = 0 + self._retry_delay = 0 + async def _retry(self) -> None: if self._retry_count < self._MAX_RETRIES: if self._retry_delay > 0: