feat: web api is preferred

This commit is contained in:
acgnhik 2022-05-31 21:19:46 +08:00
parent a460c1e9be
commit 1d3423988f
3 changed files with 87 additions and 64 deletions

View File

@ -1,27 +1,22 @@
import json
import struct
import asyncio
import json
import logging
from enum import IntEnum, Enum
import struct
from contextlib import suppress
from typing import Any, Dict, Final, Tuple, List, Union, cast, Optional
from enum import Enum, IntEnum
from typing import Any, Dict, Final, List, Optional, Tuple, Union, cast
import aiohttp
from aiohttp import ClientSession
import brotli
from tenacity import (
retry,
wait_exponential,
retry_if_exception_type,
)
from aiohttp import ClientSession
from tenacity import retry, retry_if_exception_type, wait_exponential
from .api import AppApi, WebApi
from .typing import Danmaku
from ..event.event_emitter import EventListener, EventEmitter
from ..event.event_emitter import EventEmitter, EventListener
from ..exception import exception_callback
from ..utils.mixins import AsyncStoppableMixin
from ..logging.room_id import aio_task_with_room_id
from ..utils.mixins import AsyncStoppableMixin
from .api import AppApi, WebApi
from .typing import ApiPlatform, Danmaku
__all__ = 'DanmakuClient', 'DanmakuListener', 'Danmaku', 'DanmakuCommand'
@ -64,6 +59,7 @@ class DanmakuClient(EventEmitter[DanmakuListener], AsyncStoppableMixin):
self.webapi = webapi
self._room_id = room_id
self._api_platform: ApiPlatform = 'web'
self._host_index: int = 0
self._retry_delay: int = 0
self._MAX_RETRIES: Final[int] = max_retries
@ -90,51 +86,72 @@ class DanmakuClient(EventEmitter[DanmakuListener], AsyncStoppableMixin):
@retry(
wait=wait_exponential(multiplier=0.1, max=60),
retry=retry_if_exception_type((
asyncio.TimeoutError, aiohttp.ClientError,
)),
retry=retry_if_exception_type((asyncio.TimeoutError, aiohttp.ClientError)),
)
async def _connect(self) -> None:
logger.debug('Connecting to server...')
await self._connect_websocket()
await self._send_auth()
reply = await self._recieve_auth_reply()
await self._handle_auth_reply(reply)
logger.debug('Connected to server')
await self._emit('client_connected')
try:
await self._connect_websocket()
await self._send_auth()
reply = await self._recieve_auth_reply()
await self._handle_auth_reply(reply)
except Exception:
self._rotate_api_platform()
await self._update_danmu_info()
raise
else:
logger.debug('Connected to server')
await self._emit('client_connected')
async def _connect_websocket(self) -> None:
url = 'wss://{}:{}/sub'.format(
self._danmu_info['host_list'][self._host_index]['host'],
self._danmu_info['host_list'][self._host_index]['wss_port'],
)
logger.debug(f'Connecting WebSocket... {url}')
try:
self._ws = await self.session.ws_connect(url, timeout=5)
except BaseException:
except Exception as exc:
logger.debug(f'Failed to connect WebSocket: {repr(exc)}')
host_count = len(self._danmu_info['host_list'])
self._host_index = (self._host_index + 1) % host_count
raise
logger.debug('Established WebSocket connection')
else:
logger.debug('Connected WebSocket')
async def _send_auth(self) -> None:
auth_msg = json.dumps({
'uid': 0,
'roomid': self._room_id, # must not be the short id!
'protover': WS.BODY_PROTOCOL_VERSION_BROTLI,
'platform': 'web',
'type': 2,
'key': self._danmu_info['token'],
})
auth_msg = json.dumps(
{
'uid': 0,
'roomid': self._room_id, # must not be the short id!
'protover': WS.BODY_PROTOCOL_VERSION_BROTLI,
'platform': 'web',
'type': 2,
'key': self._danmu_info['token'],
}
)
data = Frame.encode(WS.OP_USER_AUTHENTICATION, auth_msg)
await self._ws.send_bytes(data)
logger.debug('Sent user authentication')
logger.debug('Sending user authentication...')
try:
await self._ws.send_bytes(data)
except Exception as exc:
logger.debug(f'Failed to sent user authentication: {repr(exc)}')
raise
else:
logger.debug('Sent user authentication')
async def _recieve_auth_reply(self) -> aiohttp.WSMessage:
msg = await self._ws.receive(timeout=5)
if msg.type != aiohttp.WSMsgType.BINARY:
raise aiohttp.ClientError(msg)
logger.debug('Recieved reply')
return msg
logger.debug('Receiving user authentication reply...')
try:
msg = await self._ws.receive(timeout=5)
if msg.type != aiohttp.WSMsgType.BINARY:
raise aiohttp.ClientError(msg)
except Exception as exc:
logger.debug(f'Failed to receive user authentication reply: {repr(exc)}')
raise
else:
logger.debug('Recieved user authentication reply')
return msg
async def _handle_auth_reply(self, reply: aiohttp.WSMessage) -> None:
op, msg = Frame.decode(reply.data)
@ -152,11 +169,18 @@ class DanmakuClient(EventEmitter[DanmakuListener], AsyncStoppableMixin):
else:
raise ValueError(f'Unexpected code: {code}')
def _rotate_api_platform(self) -> None:
if self._api_platform == 'android':
self._api_platform = 'web'
else:
self._api_platform = 'android'
async def _update_danmu_info(self) -> None:
try:
self._danmu_info = await self.appapi.get_danmu_info(self._room_id)
except Exception:
logger.debug(f'Updating danmu info via {self._api_platform} api...')
if self._api_platform == 'web':
self._danmu_info = await self.webapi.get_danmu_info(self._room_id)
else:
self._danmu_info = await self.appapi.get_danmu_info(self._room_id)
logger.debug('Danmu info updated')
async def _disconnect(self) -> None:
@ -217,7 +241,7 @@ class DanmakuClient(EventEmitter[DanmakuListener], AsyncStoppableMixin):
wsmsg = await self._ws.receive(timeout=self._HEARTBEAT_INTERVAL)
if wsmsg.type == aiohttp.WSMsgType.BINARY:
if (result := await self._handle_data(wsmsg.data)):
if result := await self._handle_data(wsmsg.data):
return result
elif wsmsg.type == aiohttp.WSMsgType.ERROR:
await self._handle_error(cast(Exception, wsmsg.data))
@ -249,10 +273,11 @@ class DanmakuClient(EventEmitter[DanmakuListener], AsyncStoppableMixin):
async def _retry(self) -> None:
if self._retry_count < self._MAX_RETRIES:
if self._retry_delay > 0:
logger.debug('Retry after {} second{}'.format(
self._retry_delay,
's' if self._retry_delay > 1 else '',
))
logger.debug(
'Retry after {} second{}'.format(
self._retry_delay, 's' if self._retry_delay > 1 else ''
)
)
await asyncio.sleep(self._retry_delay)
await self.reconnect()
self._retry_count += 1
@ -285,9 +310,7 @@ class Frame:
@staticmethod
def decode(data: bytes) -> Tuple[int, Union[int, str, List[str]]]:
plen, hlen, ver, op, _ = struct.unpack_from(
Frame.HEADER_FORMAT, data, 0
)
plen, hlen, ver, op, _ = struct.unpack_from(Frame.HEADER_FORMAT, data, 0)
body = data[hlen:]
if op == WS.OP_MESSAGE:
@ -300,7 +323,7 @@ class Frame:
plen, hlen, ver, op, _ = struct.unpack_from(
Frame.HEADER_FORMAT, data, offset
)
body = data[hlen + offset:plen + offset]
body = data[hlen + offset : plen + offset]
msg = body.decode('utf8')
msg_list.append(msg)
offset += plen

View File

@ -157,11 +157,11 @@ class Live:
)
async def get_user_info(self, uid: int) -> UserInfo:
try:
user_info_data = await self._appapi.get_user_info(uid)
return UserInfo.from_app_api_data(user_info_data)
except Exception:
user_info_data = await self._webapi.get_user_info(uid)
return UserInfo.from_web_api_data(user_info_data)
except Exception:
user_info_data = await self._appapi.get_user_info(uid)
return UserInfo.from_app_api_data(user_info_data)
async def get_server_timestamp(self) -> int:
# the timestamp on the server at the moment in seconds
@ -171,15 +171,15 @@ class Live:
self,
qn: QualityNumber = 10000,
*,
api_platform: ApiPlatform = 'android',
api_platform: ApiPlatform = 'web',
stream_format: StreamFormat = 'flv',
stream_codec: StreamCodec = 'avc',
select_alternative: bool = False,
) -> str:
if api_platform == 'android':
info = await self._appapi.get_room_play_info(self._room_id, qn)
else:
if api_platform == 'web':
info = await self._webapi.get_room_play_info(self._room_id, qn)
else:
info = await self._appapi.get_room_play_info(self._room_id, qn)
self._check_room_play_info(info)
@ -224,11 +224,11 @@ class Live:
async def _get_room_info_via_api(self) -> ResponseData:
try:
info_data = await self._appapi.get_info_by_room(self._room_id)
info_data = await self._webapi.get_info_by_room(self._room_id)
room_info_data = info_data['room_info']
except Exception:
try:
info_data = await self._webapi.get_info_by_room(self._room_id)
info_data = await self._appapi.get_info_by_room(self._room_id)
room_info_data = info_data['room_info']
except Exception:
room_info_data = await self._webapi.get_info(self._room_id)

View File

@ -30,7 +30,7 @@ class StreamParamHolder:
*,
stream_format: StreamFormat = 'flv',
quality_number: QualityNumber = 10000,
api_platform: ApiPlatform = 'android',
api_platform: ApiPlatform = 'web',
use_alternative_stream: bool = False,
) -> None:
super().__init__()
@ -43,7 +43,7 @@ class StreamParamHolder:
def reset(self) -> None:
self._real_quality_number = None
self._api_platform = 'android'
self._api_platform = 'web'
self._use_alternative_stream = False
self._cancelled = False