添加直播间状态日志

This commit is contained in:
tursom 2024-03-01 16:31:00 +08:00
parent b8350e9c41
commit ad6b606f33
2 changed files with 59 additions and 38 deletions

View File

@ -35,12 +35,12 @@ class BLiveClient(ws_base.WebSocketClientBase):
""" """
def __init__( def __init__(
self, self,
room_id: int, room_id: int,
*, *,
uid: Optional[int] = None, uid: Optional[int] = None,
session: Optional[aiohttp.ClientSession] = None, session: Optional[aiohttp.ClientSession] = None,
heartbeat_interval=30, heartbeat_interval=30,
): ):
super().__init__(session, heartbeat_interval) super().__init__(session, heartbeat_interval)
@ -50,6 +50,8 @@ class BLiveClient(ws_base.WebSocketClientBase):
# 在调用init_room后初始化的字段 # 在调用init_room后初始化的字段
self._room_owner_uid: Optional[int] = None self._room_owner_uid: Optional[int] = None
self.live_status: Optional[int] = None
self.live_start_time: Optional[int] = None
"""主播用户ID""" """主播用户ID"""
self._host_server_list: Optional[List[dict]] = None self._host_server_list: Optional[List[dict]] = None
""" """
@ -120,8 +122,8 @@ class BLiveClient(ws_base.WebSocketClientBase):
try: try:
async with self._session.get( async with self._session.get(
UID_INIT_URL, UID_INIT_URL,
headers={'User-Agent': utils.USER_AGENT}, headers={'User-Agent': utils.USER_AGENT},
) as res: ) as res:
if res.status != 200: if res.status != 200:
logger.warning('room=%d _init_uid() failed, status=%d, reason=%s', self._tmp_room_id, logger.warning('room=%d _init_uid() failed, status=%d, reason=%s', self._tmp_room_id,
@ -158,8 +160,8 @@ class BLiveClient(ws_base.WebSocketClientBase):
async def _init_buvid(self): async def _init_buvid(self):
try: try:
async with self._session.get( async with self._session.get(
BUVID_INIT_URL, BUVID_INIT_URL,
headers={'User-Agent': utils.USER_AGENT}, headers={'User-Agent': utils.USER_AGENT},
) as res: ) as res:
if res.status != 200: if res.status != 200:
logger.warning('room=%d _init_buvid() status error, status=%d, reason=%s', logger.warning('room=%d _init_buvid() status error, status=%d, reason=%s',
@ -171,11 +173,11 @@ class BLiveClient(ws_base.WebSocketClientBase):
async def _init_room_id_and_owner(self): async def _init_room_id_and_owner(self):
try: try:
async with self._session.get( async with self._session.get(
ROOM_INIT_URL, ROOM_INIT_URL,
headers={'User-Agent': utils.USER_AGENT}, headers={'User-Agent': utils.USER_AGENT},
params={ params={
'room_id': self._tmp_room_id 'room_id': self._tmp_room_id
}, },
) as res: ) as res:
if res.status != 200: if res.status != 200:
logger.warning('room=%d _init_room_id_and_owner() failed, status=%d, reason=%s', self._tmp_room_id, logger.warning('room=%d _init_room_id_and_owner() failed, status=%d, reason=%s', self._tmp_room_id,
@ -197,17 +199,19 @@ class BLiveClient(ws_base.WebSocketClientBase):
room_info = data['room_info'] room_info = data['room_info']
self._room_id = room_info['room_id'] self._room_id = room_info['room_id']
self._room_owner_uid = room_info['uid'] self._room_owner_uid = room_info['uid']
self.live_status = room_info['live_status']
self.live_start_time = room_info['live_start_time']
return True return True
async def _init_host_server(self): async def _init_host_server(self):
try: try:
async with self._session.get( async with self._session.get(
DANMAKU_SERVER_CONF_URL, DANMAKU_SERVER_CONF_URL,
headers={'User-Agent': utils.USER_AGENT}, headers={'User-Agent': utils.USER_AGENT},
params={ params={
'id': self._room_id, 'id': self._room_id,
'type': 0 'type': 0
}, },
) as res: ) as res:
if res.status != 200: if res.status != 200:
logger.warning('room=%d _init_host_server() failed, status=%d, reason=%s', self._room_id, logger.warning('room=%d _init_host_server() failed, status=%d, reason=%s', self._room_id,

View File

@ -13,13 +13,16 @@ import redis
import blivedm import blivedm
from blivedm.clients import ws_base from blivedm.clients import ws_base
os.makedirs("logs", exist_ok=True)
room_status_log = open("logs/room_status.jsonl", "a", encoding="utf-8")
# set log level to INFO # set log level to INFO
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
logger = logging.getLogger() logger = logging.getLogger()
room_list_key = "bilibili.live.danmu.room_list" room_list_key = "bilibili.live.danmu.room_list"
r = redis.Redis(host='redis', port=6379, db=0) r = redis.Redis(host="redis", port=6379, db=0)
TEST_ROOM_IDS = [int(room_id) for room_id in set(r.lrange(room_list_key, 0, -1))] TEST_ROOM_IDS = [int(room_id) for room_id in set(r.lrange(room_list_key, 0, -1))]
clients = {} clients = {}
@ -39,12 +42,7 @@ async def main():
for room_id in TEST_ROOM_IDS: for room_id in TEST_ROOM_IDS:
client = None client = None
try: try:
logger.info("connect to room %s", room_id) client = await connect_room(room_id)
client = blivedm.BLiveClient(room_id, session=session)
client.set_handler(jsonl_logger)
client.start()
clients[room_id] = client
logger.info("connect to room %s success", room_id)
except Exception as e: except Exception as e:
if client is not None: if client is not None:
await client.stop_and_close() await client.stop_and_close()
@ -65,7 +63,32 @@ async def main():
await session.close() await session.close()
async def connect_room(room_id):
logger.info("connect to room %s", room_id)
client = blivedm.BLiveClient(room_id, session=session)
client.set_handler(jsonl_logger)
await client.init_room()
client._need_init_room = False
client.start()
clients[room_id] = client
logger.info("connect to room %s success", room_id)
room_status_log.write(json.dumps(
{
"room_id": room_id,
"live_status": client.live_status,
"live_start_time": client.live_start_time,
},
separators=(",", ":"),
ensure_ascii=False,
))
room_status_log.write("\n")
return client
def flush_files(): def flush_files():
room_status_log.flush()
for room_id in log_files: for room_id in log_files:
log_files[room_id].flush() log_files[room_id].flush()
@ -97,12 +120,7 @@ def room_changed_handler(message):
for added in TEST_ROOM_IDS - clients.keys(): for added in TEST_ROOM_IDS - clients.keys():
client = None client = None
try: try:
logger.info("connect to room %s", added) client = asyncio.run(connect_room(added))
client = blivedm.BLiveClient(added, session=session)
client.set_handler(jsonl_logger)
client.start()
clients[added] = client
logger.info("connect to room %s success", added)
except Exception as e: except Exception as e:
if client is not None: if client is not None:
asyncio.run(client.stop_and_close()) asyncio.run(client.stop_and_close())
@ -112,8 +130,8 @@ def room_changed_handler(message):
def init_session(): def init_session():
cookies = http.cookies.SimpleCookie() cookies = http.cookies.SimpleCookie()
cookies['SESSDATA'] = SESSDATA cookies["SESSDATA"] = SESSDATA
cookies['SESSDATA']['domain'] = 'bilibili.com' cookies["SESSDATA"]["domain"] = "bilibili.com"
global session global session
session = aiohttp.ClientSession() session = aiohttp.ClientSession()
@ -123,7 +141,6 @@ def init_session():
class JsonlLogger(blivedm.BaseHandler): class JsonlLogger(blivedm.BaseHandler):
def _global_callback(self, client: ws_base.WebSocketClientBase, message: dict): def _global_callback(self, client: ws_base.WebSocketClientBase, message: dict):
if client.room_id not in log_files: if client.room_id not in log_files:
os.makedirs("logs", exist_ok=True)
log_files[client.room_id] = open(f"logs/{client.room_id}.jsonl", "a", encoding="utf-8") log_files[client.room_id] = open(f"logs/{client.room_id}.jsonl", "a", encoding="utf-8")
log_files[client.room_id].write(f"{json.dumps(message, separators=(',', ':'), ensure_ascii=False)}\n") log_files[client.room_id].write(f"{json.dumps(message, separators=(',', ':'), ensure_ascii=False)}\n")
@ -131,5 +148,5 @@ class JsonlLogger(blivedm.BaseHandler):
jsonl_logger = JsonlLogger() jsonl_logger = JsonlLogger()
if __name__ == '__main__': if __name__ == "__main__":
asyncio.run(main()) asyncio.run(main())