diff --git a/README.md b/README.md index 4c6fb3d..93fd2f9 100644 --- a/README.md +++ b/README.md @@ -15,4 +15,4 @@ Python获取bilibili直播弹幕的库,使用WebSocket协议,支持web端和 pip install -r requirements.txt ``` -3. 例程看[sample.py](./sample.py)和[open_live_sample.py](./open_live_sample.py) +3. 例程看[sample.py](./main.py)和[open_live_sample.py](./open_live_sample.py) diff --git a/sample.py b/main.py similarity index 85% rename from sample.py rename to main.py index cf9b29a..3653347 100644 --- a/sample.py +++ b/main.py @@ -9,43 +9,17 @@ import time from typing import * import aiohttp -import redis +import redis_util as redis import blivedm from blivedm.clients import ws_base -try: - with open("config.json") as conf_f: - conf = json.load(conf_f) -except FileNotFoundError: - conf = {} -redis_conf = conf.get("redis", {}) -os.makedirs("logs", exist_ok=True) -room_status_log = open("logs/room_status.jsonl", "a", encoding="utf-8") - -# set log level to INFO -logging.basicConfig(level=logging.INFO) - -logger = logging.getLogger() - -room_list_key = "bilibili.live.danmu.room_list" -r = redis.Redis( - host=redis_conf.get("host", "127.0.0.1"), - port=redis_conf.get("port", 6379), - db=redis_conf.get("port", 0), -) - -TEST_ROOM_IDS = [int(room_id) for room_id in set(r.lrange(room_list_key, 0, -1))] -clients = {} -log_files = {} - -with open("cookie.txt", "r", encoding="utf-8") as f: - SESSDATA = f.read().strip() - -session: Optional[aiohttp.ClientSession] = None +def room_from_config(): + return conf.get("room_ids", []) +# noinspection PyUnusedLocal def handle_sighup(signum, frame): global log_files global room_status_log @@ -64,12 +38,37 @@ def handle_sighup(signum, frame): room_status_log = open("logs/room_status.jsonl", "a", encoding="utf-8") -signal.signal(signal.SIGHUP, handle_sighup) +def on_room_changed(new_room_ids): + global TEST_ROOM_IDS + + TEST_ROOM_IDS = list(set(new_room_ids + room_from_config())) + + for removed in clients.keys() - TEST_ROOM_IDS: + asyncio.run(clients[removed].stop_and_close()) + del clients[removed] + + if removed in log_files: + log_files[removed].close() + del log_files[removed] + + for added in TEST_ROOM_IDS - clients.keys(): + client = None + try: + client = asyncio.run(connect_room(added)) + except Exception as e: + if client is not None: + asyncio.run(client.stop_and_close()) async def main(): + global TEST_ROOM_IDS + flush_files() + redis.on_room_changed = on_room_changed + redis.init(conf.get("redis", None)) + TEST_ROOM_IDS = list(set(TEST_ROOM_IDS + redis.get_room_list())) + init_session() try: for room_id in TEST_ROOM_IDS: @@ -82,9 +81,6 @@ async def main(): raise e - # run subscribe_redis on new thread - threading.Thread(target=subscribe_redis, daemon=True).start() - while True: await asyncio.gather(*( clients[room_id].join() for room_id in clients @@ -135,35 +131,6 @@ def flush_files(): timer.start() -def subscribe_redis(): - p = r.pubsub() - p.psubscribe(**{f"__keyspace@0__:{room_list_key}": room_changed_handler}) - for message in p.listen(): - pass - - -# noinspection PyUnusedLocal -def room_changed_handler(message): - global TEST_ROOM_IDS - TEST_ROOM_IDS = set(int(room_id) for room_id in set(r.lrange(room_list_key, 0, -1))) - - for removed in clients.keys() - TEST_ROOM_IDS: - asyncio.run(clients[removed].stop_and_close()) - del clients[removed] - - if removed in log_files: - log_files[removed].close() - del log_files[removed] - - for added in TEST_ROOM_IDS - clients.keys(): - client = None - try: - client = asyncio.run(connect_room(added)) - except Exception as e: - if client is not None: - asyncio.run(client.stop_and_close()) - - def init_session(): cookies = http.cookies.SimpleCookie() cookies["SESSDATA"] = SESSDATA @@ -185,6 +152,33 @@ class JsonlLogger(blivedm.BaseHandler): log_files[client.room_id].write(f"{json.dumps(message, separators=(',', ':'), ensure_ascii=False)}\n") +# set log level to INFO +logging.basicConfig(level=logging.INFO) + +try: + with open("config.json") as conf_f: + conf = json.load(conf_f) +except FileNotFoundError: + conf = {} +redis_conf: dict | None = conf.get("redis", None) + +os.makedirs("logs", exist_ok=True) +room_status_log = open("logs/room_status.jsonl", "a", encoding="utf-8") + +logger = logging.getLogger() + +clients = {} +log_files = {} + +with open("cookie.txt", "r", encoding="utf-8") as f: + SESSDATA = f.read().strip() + +session: Optional[aiohttp.ClientSession] = None + +TEST_ROOM_IDS: list[int] = room_from_config() + +signal.signal(signal.SIGHUP, handle_sighup) + jsonl_logger = JsonlLogger() if __name__ == "__main__": diff --git a/redis_util.py b/redis_util.py new file mode 100644 index 0000000..c57ffd1 --- /dev/null +++ b/redis_util.py @@ -0,0 +1,43 @@ +import threading +from typing import Callable + +import redis + +room_list_key = "bilibili.live.danmu.room_list" +r: redis.Redis | None = None +on_room_changed: Callable[[list[int]], None] = lambda ignore: None + + +def init(redis_conf: dict | None): + global r + + if redis_conf is None: + return + + r = redis.Redis( + host=redis_conf.get("host", "127.0.0.1"), + port=redis_conf.get("port", 6379), + db=redis_conf.get("port", 0), + ) + + # run subscribe_redis on new thread + threading.Thread(target=subscribe_redis, daemon=True).start() + + +def get_room_list() -> list[int]: + if r is None: + return [] + + return [int(room_id) for room_id in set(r.lrange(room_list_key, 0, -1))] + + +def subscribe_redis(): + p = r.pubsub() + p.psubscribe(**{f"__keyspace@0__:{room_list_key}": room_changed_handler}) + for message in p.listen(): + pass + + +# noinspection PyUnusedLocal +def room_changed_handler(ignore): + on_room_changed(get_room_list())