diff --git a/src/blrec/bili/live_monitor.py b/src/blrec/bili/live_monitor.py index a1c644e..5ab774b 100644 --- a/src/blrec/bili/live_monitor.py +++ b/src/blrec/bili/live_monitor.py @@ -1,13 +1,17 @@ +import asyncio import logging +import random +from contextlib import suppress +from blrec.exception import exception_callback +from blrec.logging.room_id import aio_task_with_room_id -from .danmaku_client import DanmakuClient, DanmakuListener, DanmakuCommand -from .live import Live -from .typing import Danmaku -from .models import LiveStatus, RoomInfo -from ..event.event_emitter import EventListener, EventEmitter +from ..event.event_emitter import EventEmitter, EventListener from ..utils.mixins import SwitchableMixin - +from .danmaku_client import DanmakuClient, DanmakuCommand, DanmakuListener +from .live import Live +from .models import LiveStatus, RoomInfo +from .typing import Danmaku __all__ = 'LiveMonitor', 'LiveEventListener' @@ -37,9 +41,7 @@ class LiveEventListener(EventListener): ... -class LiveMonitor( - EventEmitter[LiveEventListener], DanmakuListener, SwitchableMixin -): +class LiveMonitor(EventEmitter[LiveEventListener], DanmakuListener, SwitchableMixin): def __init__(self, danmaku_client: DanmakuClient, live: Live) -> None: super().__init__() self._danmaku_client = danmaku_client @@ -49,18 +51,49 @@ class LiveMonitor( self._previous_status = self._live.room_info.live_status if self._live.is_living(): self._status_count = 2 + self._stream_available = True else: self._status_count = 0 + self._stream_available = False def _do_enable(self) -> None: self._init_status() self._danmaku_client.add_listener(self) + self._start_polling() logger.debug('Enabled live monitor') def _do_disable(self) -> None: self._danmaku_client.remove_listener(self) + asyncio.create_task(self._stop_polling()) + asyncio.create_task(self._stop_checking()) logger.debug('Disabled live monitor') + def _start_polling(self) -> None: + self._polling_task = asyncio.create_task(self._poll_live_status()) + self._polling_task.add_done_callback(exception_callback) + logger.debug('Started polling live status') + + async def _stop_polling(self) -> None: + self._polling_task.cancel() + with suppress(asyncio.CancelledError): + await self._polling_task + del self._polling_task + logger.debug('Stopped polling live status') + + def _start_checking(self) -> None: + self._checking_task = asyncio.create_task(self._check_if_stream_available()) + self._checking_task.add_done_callback(exception_callback) + logger.debug('Started checking if stream available') + + async def _stop_checking(self) -> None: + if not hasattr(self, '_checking_task'): + return + self._checking_task.cancel() + with suppress(asyncio.CancelledError): + await self._checking_task + del self._checking_task + logger.debug('Stopped checking if stream available') + async def on_client_reconnected(self) -> None: # check the live status after the client reconnected and simulate # events if necessary. @@ -100,41 +133,57 @@ class LiveMonitor( await self._emit('room_changed', self._live.room_info) async def _handle_status_change(self, current_status: LiveStatus) -> None: - logger.debug('Live status changed from {} to {}'.format( - self._previous_status.name, current_status.name - )) - - await self._live.update_room_info() - if (s := self._live.room_info.live_status) != current_status: - logger.warning( - 'Updated live status {} is inconsistent with ' - 'current live status {}'.format(s.name, current_status.name) + logger.debug( + 'Live status changed from {} to {}'.format( + self._previous_status.name, current_status.name ) - - await self._emit( - 'live_status_changed', current_status, self._previous_status ) + await self._emit('live_status_changed', current_status, self._previous_status) + if current_status != LiveStatus.LIVE: self._status_count = 0 + self._stream_available = False await self._emit('live_ended', self._live) else: self._status_count += 1 if self._status_count == 1: assert self._previous_status != LiveStatus.LIVE + self._start_checking() await self._emit('live_began', self._live) elif self._status_count == 2: assert self._previous_status == LiveStatus.LIVE - await self._emit('live_stream_available', self._live) + if not self._stream_available: + self._stream_available = True + await self._stop_checking() + await self._emit('live_stream_available', self._live) elif self._status_count > 2: assert self._previous_status == LiveStatus.LIVE await self._emit('live_stream_reset', self._live) else: pass - logger.debug('Number of sequential LIVE status: {}'.format( - self._status_count - )) + logger.debug('Number of sequential LIVE status: {}'.format(self._status_count)) self._previous_status = current_status + + @aio_task_with_room_id + async def _poll_live_status(self) -> None: + while True: + await asyncio.sleep(600 + random.randrange(-60, 60)) + await self._live.update_room_info() + current_status = self._live.room_info.live_status + if current_status != self._previous_status: + await self._handle_status_change(current_status) + + @aio_task_with_room_id + async def _check_if_stream_available(self) -> None: + while not self._stream_available: + try: + self._live.get_live_stream_url() + except Exception: + await asyncio.sleep(1) + else: + self._stream_available = True + await self._emit('live_stream_available', self._live) diff --git a/src/blrec/core/recorder.py b/src/blrec/core/recorder.py index a434f8d..54daff4 100644 --- a/src/blrec/core/recorder.py +++ b/src/blrec/core/recorder.py @@ -14,7 +14,6 @@ from blrec.bili.models import RoomInfo from blrec.bili.typing import QualityNumber, StreamFormat from blrec.event.event_emitter import EventEmitter, EventListener from blrec.flv.operators import MetaData, StreamProfile -from blrec.logging.room_id import aio_task_with_room_id from blrec.setting.typing import RecordingMode from blrec.utils.mixins import AsyncStoppableMixin @@ -457,8 +456,6 @@ class Recorder( await self._prepare() if self._stream_available: await self._stream_recorder.start() - else: - asyncio.create_task(self._guard()) logger.info('Started recording') await self._emit('recording_started', self) @@ -489,29 +486,6 @@ class Recorder( self._danmaku_dumper.clear_files() self._stream_recorder.clear_files() - @aio_task_with_room_id - async def _guard(self, timeout: float = 60) -> None: - await asyncio.sleep(timeout) - if not self._recording: - return - if self._stream_available: - return - logger.debug( - f'Stream not available in {timeout} seconds, the event maybe lost.' - ) - - await self._live.update_info() - if self._live.is_living(): - logger.debug('The live is living now') - self._stream_available = True - if self._stream_recorder.stopped: - await self._stream_recorder.start() - else: - logger.debug('The live has ended before streaming') - self._stream_available = False - if not self._stream_recorder.stopped: - await self.stop() - def _print_waiting_message(self) -> None: logger.info('Waiting... until the live starts')