feat: improve live monitoring
This commit is contained in:
parent
50971eeb0e
commit
6469881220
@ -1,13 +1,17 @@
|
||||
import asyncio
|
||||
import logging
|
||||
import random
|
||||
from contextlib import suppress
|
||||
|
||||
from blrec.exception import exception_callback
|
||||
from blrec.logging.room_id import aio_task_with_room_id
|
||||
|
||||
from .danmaku_client import DanmakuClient, DanmakuListener, DanmakuCommand
|
||||
from .live import Live
|
||||
from .typing import Danmaku
|
||||
from .models import LiveStatus, RoomInfo
|
||||
from ..event.event_emitter import EventListener, EventEmitter
|
||||
from ..event.event_emitter import EventEmitter, EventListener
|
||||
from ..utils.mixins import SwitchableMixin
|
||||
|
||||
from .danmaku_client import DanmakuClient, DanmakuCommand, DanmakuListener
|
||||
from .live import Live
|
||||
from .models import LiveStatus, RoomInfo
|
||||
from .typing import Danmaku
|
||||
|
||||
__all__ = 'LiveMonitor', 'LiveEventListener'
|
||||
|
||||
@ -37,9 +41,7 @@ class LiveEventListener(EventListener):
|
||||
...
|
||||
|
||||
|
||||
class LiveMonitor(
|
||||
EventEmitter[LiveEventListener], DanmakuListener, SwitchableMixin
|
||||
):
|
||||
class LiveMonitor(EventEmitter[LiveEventListener], DanmakuListener, SwitchableMixin):
|
||||
def __init__(self, danmaku_client: DanmakuClient, live: Live) -> None:
|
||||
super().__init__()
|
||||
self._danmaku_client = danmaku_client
|
||||
@ -49,18 +51,49 @@ class LiveMonitor(
|
||||
self._previous_status = self._live.room_info.live_status
|
||||
if self._live.is_living():
|
||||
self._status_count = 2
|
||||
self._stream_available = True
|
||||
else:
|
||||
self._status_count = 0
|
||||
self._stream_available = False
|
||||
|
||||
def _do_enable(self) -> None:
|
||||
self._init_status()
|
||||
self._danmaku_client.add_listener(self)
|
||||
self._start_polling()
|
||||
logger.debug('Enabled live monitor')
|
||||
|
||||
def _do_disable(self) -> None:
|
||||
self._danmaku_client.remove_listener(self)
|
||||
asyncio.create_task(self._stop_polling())
|
||||
asyncio.create_task(self._stop_checking())
|
||||
logger.debug('Disabled live monitor')
|
||||
|
||||
def _start_polling(self) -> None:
|
||||
self._polling_task = asyncio.create_task(self._poll_live_status())
|
||||
self._polling_task.add_done_callback(exception_callback)
|
||||
logger.debug('Started polling live status')
|
||||
|
||||
async def _stop_polling(self) -> None:
|
||||
self._polling_task.cancel()
|
||||
with suppress(asyncio.CancelledError):
|
||||
await self._polling_task
|
||||
del self._polling_task
|
||||
logger.debug('Stopped polling live status')
|
||||
|
||||
def _start_checking(self) -> None:
|
||||
self._checking_task = asyncio.create_task(self._check_if_stream_available())
|
||||
self._checking_task.add_done_callback(exception_callback)
|
||||
logger.debug('Started checking if stream available')
|
||||
|
||||
async def _stop_checking(self) -> None:
|
||||
if not hasattr(self, '_checking_task'):
|
||||
return
|
||||
self._checking_task.cancel()
|
||||
with suppress(asyncio.CancelledError):
|
||||
await self._checking_task
|
||||
del self._checking_task
|
||||
logger.debug('Stopped checking if stream available')
|
||||
|
||||
async def on_client_reconnected(self) -> None:
|
||||
# check the live status after the client reconnected and simulate
|
||||
# events if necessary.
|
||||
@ -100,41 +133,57 @@ class LiveMonitor(
|
||||
await self._emit('room_changed', self._live.room_info)
|
||||
|
||||
async def _handle_status_change(self, current_status: LiveStatus) -> None:
|
||||
logger.debug('Live status changed from {} to {}'.format(
|
||||
self._previous_status.name, current_status.name
|
||||
))
|
||||
|
||||
await self._live.update_room_info()
|
||||
if (s := self._live.room_info.live_status) != current_status:
|
||||
logger.warning(
|
||||
'Updated live status {} is inconsistent with '
|
||||
'current live status {}'.format(s.name, current_status.name)
|
||||
logger.debug(
|
||||
'Live status changed from {} to {}'.format(
|
||||
self._previous_status.name, current_status.name
|
||||
)
|
||||
|
||||
await self._emit(
|
||||
'live_status_changed', current_status, self._previous_status
|
||||
)
|
||||
|
||||
await self._emit('live_status_changed', current_status, self._previous_status)
|
||||
|
||||
if current_status != LiveStatus.LIVE:
|
||||
self._status_count = 0
|
||||
self._stream_available = False
|
||||
await self._emit('live_ended', self._live)
|
||||
else:
|
||||
self._status_count += 1
|
||||
|
||||
if self._status_count == 1:
|
||||
assert self._previous_status != LiveStatus.LIVE
|
||||
self._start_checking()
|
||||
await self._emit('live_began', self._live)
|
||||
elif self._status_count == 2:
|
||||
assert self._previous_status == LiveStatus.LIVE
|
||||
await self._emit('live_stream_available', self._live)
|
||||
if not self._stream_available:
|
||||
self._stream_available = True
|
||||
await self._stop_checking()
|
||||
await self._emit('live_stream_available', self._live)
|
||||
elif self._status_count > 2:
|
||||
assert self._previous_status == LiveStatus.LIVE
|
||||
await self._emit('live_stream_reset', self._live)
|
||||
else:
|
||||
pass
|
||||
|
||||
logger.debug('Number of sequential LIVE status: {}'.format(
|
||||
self._status_count
|
||||
))
|
||||
logger.debug('Number of sequential LIVE status: {}'.format(self._status_count))
|
||||
|
||||
self._previous_status = current_status
|
||||
|
||||
@aio_task_with_room_id
|
||||
async def _poll_live_status(self) -> None:
|
||||
while True:
|
||||
await asyncio.sleep(600 + random.randrange(-60, 60))
|
||||
await self._live.update_room_info()
|
||||
current_status = self._live.room_info.live_status
|
||||
if current_status != self._previous_status:
|
||||
await self._handle_status_change(current_status)
|
||||
|
||||
@aio_task_with_room_id
|
||||
async def _check_if_stream_available(self) -> None:
|
||||
while not self._stream_available:
|
||||
try:
|
||||
self._live.get_live_stream_url()
|
||||
except Exception:
|
||||
await asyncio.sleep(1)
|
||||
else:
|
||||
self._stream_available = True
|
||||
await self._emit('live_stream_available', self._live)
|
||||
|
@ -14,7 +14,6 @@ from blrec.bili.models import RoomInfo
|
||||
from blrec.bili.typing import QualityNumber, StreamFormat
|
||||
from blrec.event.event_emitter import EventEmitter, EventListener
|
||||
from blrec.flv.operators import MetaData, StreamProfile
|
||||
from blrec.logging.room_id import aio_task_with_room_id
|
||||
from blrec.setting.typing import RecordingMode
|
||||
from blrec.utils.mixins import AsyncStoppableMixin
|
||||
|
||||
@ -457,8 +456,6 @@ class Recorder(
|
||||
await self._prepare()
|
||||
if self._stream_available:
|
||||
await self._stream_recorder.start()
|
||||
else:
|
||||
asyncio.create_task(self._guard())
|
||||
|
||||
logger.info('Started recording')
|
||||
await self._emit('recording_started', self)
|
||||
@ -489,29 +486,6 @@ class Recorder(
|
||||
self._danmaku_dumper.clear_files()
|
||||
self._stream_recorder.clear_files()
|
||||
|
||||
@aio_task_with_room_id
|
||||
async def _guard(self, timeout: float = 60) -> None:
|
||||
await asyncio.sleep(timeout)
|
||||
if not self._recording:
|
||||
return
|
||||
if self._stream_available:
|
||||
return
|
||||
logger.debug(
|
||||
f'Stream not available in {timeout} seconds, the event maybe lost.'
|
||||
)
|
||||
|
||||
await self._live.update_info()
|
||||
if self._live.is_living():
|
||||
logger.debug('The live is living now')
|
||||
self._stream_available = True
|
||||
if self._stream_recorder.stopped:
|
||||
await self._stream_recorder.start()
|
||||
else:
|
||||
logger.debug('The live has ended before streaming')
|
||||
self._stream_available = False
|
||||
if not self._stream_recorder.stopped:
|
||||
await self.stop()
|
||||
|
||||
def _print_waiting_message(self) -> None:
|
||||
logger.info('Waiting... until the live starts')
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user