import asyncio import http.cookies import json import logging import os import signal import threading import time from typing import * import aiohttp import redis_util as redis import blivedm from blivedm.clients import ws_base def room_from_config(): return conf.get("room_ids", []) # noinspection PyUnusedLocal def handle_sighup(signum, frame): global log_files global room_status_log for room_id in log_files: try: log_files[room_id].close() except Exception as e: logger.exception(f"close log_files[{room_id}] failed with {e}") try: room_status_log.close() except Exception as e: logger.exception(f"close room_status_log failed with {e}") log_files = {} room_status_log = open("logs/room_status.jsonl", "a", encoding="utf-8") def on_room_changed(new_room_ids): global TEST_ROOM_IDS TEST_ROOM_IDS = list(set(new_room_ids + room_from_config())) for removed in clients.keys() - TEST_ROOM_IDS: asyncio.run(clients[removed].stop_and_close()) del clients[removed] if removed in log_files: log_files[removed].close() del log_files[removed] for added in TEST_ROOM_IDS - clients.keys(): client = None try: client = asyncio.run(connect_room(added)) except Exception as e: if client is not None: asyncio.run(client.stop_and_close()) async def main(): global TEST_ROOM_IDS flush_files() redis.on_room_changed = on_room_changed redis.init(conf.get("redis", None)) TEST_ROOM_IDS = list(set(TEST_ROOM_IDS + redis.get_room_list())) init_session() try: for room_id in TEST_ROOM_IDS: client = None try: client = await connect_room(room_id) except Exception as e: if client is not None: await client.stop_and_close() raise e while True: await asyncio.gather(*( clients[room_id].join() for room_id in clients )) await asyncio.sleep(1) finally: for room_id in clients: await clients[room_id].stop_and_close() await session.close() async def connect_room(room_id): logger.info("connect to room %s", room_id) client = blivedm.BLiveClient(room_id, session=session) client.set_handler(jsonl_logger) await client.init_room() client._need_init_room = False client.start() clients[room_id] = client logger.info("connect to room %s success", room_id) try: room_status_log.write(json.dumps( { "room_id": room_id, "live_status": client.live_status, "live_start_time": client.live_start_time, }, separators=(",", ":"), ensure_ascii=False, )) room_status_log.write("\n") except Exception as e: logger.exception(f"write room_status_log failed with {e}") return client def flush_files(): try: room_status_log.flush() for room_id in log_files: log_files[room_id].flush() except Exception as e: logger.exception(f"flush_files failed with {e}") timer = threading.Timer(1, flush_files) timer.daemon = True timer.start() def init_session(): global session, cookies session = aiohttp.ClientSession() session.cookie_jar.update_cookies(cookies) class JsonlLogger(blivedm.BaseHandler): def _global_callback(self, client: ws_base.WebSocketClientBase, message: dict): if client.room_id not in log_files: log_files[client.room_id] = open( f"logs/{client.room_id}.jsonl", "a", encoding="utf-8") if "cmd" in message and message["cmd"] == "PREPARING": message["timestamp"] = int(time.time()) log_files[client.room_id].write( f"{json.dumps(message, separators=(',', ':'), ensure_ascii=False)}\n") # set log level to INFO logging.basicConfig(level=logging.INFO) try: with open("config.json") as conf_f: conf = json.load(conf_f) except FileNotFoundError: conf = {} redis_conf: dict | None = conf.get("redis", None) os.makedirs("logs", exist_ok=True) room_status_log = open("logs/room_status.jsonl", "a", encoding="utf-8") logger = logging.getLogger() clients = {} log_files = {} with open("cookie.txt", "r", encoding="utf-8") as f: cookies = http.cookies.SimpleCookie() cookies.load(f.read().strip()) cookies["SESSDATA"]["domain"] = "bilibili.com" session: Optional[aiohttp.ClientSession] = None TEST_ROOM_IDS: list[int] = room_from_config() signal.signal(signal.SIGHUP, handle_sighup) jsonl_logger = JsonlLogger() if __name__ == "__main__": asyncio.run(main())