# -*- coding: utf-8 -*- import asyncio import http.cookies import json import logging import os import threading from typing import * import aiohttp import redis import blivedm from blivedm.clients import ws_base # set log level to INFO logging.basicConfig(level=logging.INFO) logger = logging.getLogger() room_list_key = "bilibili.live.danmu.room_list" r = redis.Redis(host='redis', port=6379, db=0) TEST_ROOM_IDS = [int(room_id) for room_id in set(r.lrange(room_list_key, 0, -1))] clients = {} log_files = {} with open("cookie.txt", "r", encoding="utf-8") as f: SESSDATA = f.read().strip() session: Optional[aiohttp.ClientSession] = None async def main(): flush_files() init_session() try: for room_id in TEST_ROOM_IDS: client = None try: logger.info("connect to room %s", room_id) client = blivedm.BLiveClient(room_id, session=session) client.set_handler(jsonl_logger) client.start() clients[room_id] = client logger.info("connect to room %s success", room_id) except Exception as e: if client is not None: await client.stop_and_close() raise e # run subscribe_redis on new thread threading.Thread(target=subscribe_redis).start() while True: await asyncio.gather(*( clients[room_id].join() for room_id in clients )) finally: for room_id in clients: await clients[room_id].stop_and_close() await session.close() def flush_files(): for room_id in log_files: log_files[room_id].flush() threading.Timer(1, flush_files).start() def subscribe_redis(): p = r.pubsub() p.psubscribe(**{f"__keyspace@0__:{room_list_key}": room_changed_handler}) while True: message = p.get_message() if message: print(message) # noinspection PyUnusedLocal def room_changed_handler(message): global TEST_ROOM_IDS TEST_ROOM_IDS = set(int(room_id) for room_id in set(r.lrange(room_list_key, 0, -1))) for removed in clients.keys() - TEST_ROOM_IDS: asyncio.run(clients[removed].stop_and_close()) del clients[removed] if removed in log_files: log_files[removed].close() del log_files[removed] for added in TEST_ROOM_IDS - clients.keys(): client = None try: logger.info("connect to room %s", added) client = blivedm.BLiveClient(added, session=session) client.set_handler(jsonl_logger) client.start() clients[added] = client logger.info("connect to room %s success", added) except Exception as e: if client is not None: asyncio.run(client.stop_and_close()) raise e def init_session(): cookies = http.cookies.SimpleCookie() cookies['SESSDATA'] = SESSDATA cookies['SESSDATA']['domain'] = 'bilibili.com' global session session = aiohttp.ClientSession() session.cookie_jar.update_cookies(cookies) class JsonlLogger(blivedm.BaseHandler): def _global_callback(self, client: ws_base.WebSocketClientBase, message: dict): if client.room_id not in log_files: os.makedirs("logs", exist_ok=True) log_files[client.room_id] = open(f"logs/{client.room_id}.jsonl", "a", encoding="utf-8") log_files[client.room_id].write(f"{json.dumps(message, separators=(',', ':'))}\n") jsonl_logger = JsonlLogger() if __name__ == '__main__': asyncio.run(main())