From 764c5172247055dcbaf5df12dce1993450c61f21 Mon Sep 17 00:00:00 2001
From: tursom <tursom@foxmail.com>
Date: Wed, 20 Mar 2024 11:22:07 +0800
Subject: [PATCH] move redis to separate module

---
 README.md            |   2 +-
 sample.py => main.py | 120 ++++++++++++++++++++-----------------------
 redis_util.py        |  43 ++++++++++++++++
 3 files changed, 101 insertions(+), 64 deletions(-)
 rename sample.py => main.py (85%)
 create mode 100644 redis_util.py

diff --git a/README.md b/README.md
index 4c6fb3d..93fd2f9 100644
--- a/README.md
+++ b/README.md
@@ -15,4 +15,4 @@ Python获取bilibili直播弹幕的库,使用WebSocket协议,支持web端和
     pip install -r requirements.txt
     ```
 
-3. 例程看[sample.py](./sample.py)和[open_live_sample.py](./open_live_sample.py)
+3. 例程看[sample.py](./main.py)和[open_live_sample.py](./open_live_sample.py)
diff --git a/sample.py b/main.py
similarity index 85%
rename from sample.py
rename to main.py
index cf9b29a..3653347 100644
--- a/sample.py
+++ b/main.py
@@ -9,43 +9,17 @@ import time
 from typing import *
 
 import aiohttp
-import redis
+import redis_util as redis
 
 import blivedm
 from blivedm.clients import ws_base
 
-try:
-    with open("config.json") as conf_f:
-        conf = json.load(conf_f)
-except FileNotFoundError:
-    conf = {}
-redis_conf = conf.get("redis", {})
 
-os.makedirs("logs", exist_ok=True)
-room_status_log = open("logs/room_status.jsonl", "a", encoding="utf-8")
-
-# set log level to INFO
-logging.basicConfig(level=logging.INFO)
-
-logger = logging.getLogger()
-
-room_list_key = "bilibili.live.danmu.room_list"
-r = redis.Redis(
-    host=redis_conf.get("host", "127.0.0.1"),
-    port=redis_conf.get("port", 6379),
-    db=redis_conf.get("port", 0),
-)
-
-TEST_ROOM_IDS = [int(room_id) for room_id in set(r.lrange(room_list_key, 0, -1))]
-clients = {}
-log_files = {}
-
-with open("cookie.txt", "r", encoding="utf-8") as f:
-    SESSDATA = f.read().strip()
-
-session: Optional[aiohttp.ClientSession] = None
+def room_from_config():
+    return conf.get("room_ids", [])
 
 
+# noinspection PyUnusedLocal
 def handle_sighup(signum, frame):
     global log_files
     global room_status_log
@@ -64,12 +38,37 @@ def handle_sighup(signum, frame):
     room_status_log = open("logs/room_status.jsonl", "a", encoding="utf-8")
 
 
-signal.signal(signal.SIGHUP, handle_sighup)
+def on_room_changed(new_room_ids):
+    global TEST_ROOM_IDS
+
+    TEST_ROOM_IDS = list(set(new_room_ids + room_from_config()))
+
+    for removed in clients.keys() - TEST_ROOM_IDS:
+        asyncio.run(clients[removed].stop_and_close())
+        del clients[removed]
+
+        if removed in log_files:
+            log_files[removed].close()
+            del log_files[removed]
+
+    for added in TEST_ROOM_IDS - clients.keys():
+        client = None
+        try:
+            client = asyncio.run(connect_room(added))
+        except Exception as e:
+            if client is not None:
+                asyncio.run(client.stop_and_close())
 
 
 async def main():
+    global TEST_ROOM_IDS
+
     flush_files()
 
+    redis.on_room_changed = on_room_changed
+    redis.init(conf.get("redis", None))
+    TEST_ROOM_IDS = list(set(TEST_ROOM_IDS + redis.get_room_list()))
+
     init_session()
     try:
         for room_id in TEST_ROOM_IDS:
@@ -82,9 +81,6 @@ async def main():
 
                 raise e
 
-        # run subscribe_redis on new thread
-        threading.Thread(target=subscribe_redis, daemon=True).start()
-
         while True:
             await asyncio.gather(*(
                 clients[room_id].join() for room_id in clients
@@ -135,35 +131,6 @@ def flush_files():
     timer.start()
 
 
-def subscribe_redis():
-    p = r.pubsub()
-    p.psubscribe(**{f"__keyspace@0__:{room_list_key}": room_changed_handler})
-    for message in p.listen():
-        pass
-
-
-# noinspection PyUnusedLocal
-def room_changed_handler(message):
-    global TEST_ROOM_IDS
-    TEST_ROOM_IDS = set(int(room_id) for room_id in set(r.lrange(room_list_key, 0, -1)))
-
-    for removed in clients.keys() - TEST_ROOM_IDS:
-        asyncio.run(clients[removed].stop_and_close())
-        del clients[removed]
-
-        if removed in log_files:
-            log_files[removed].close()
-            del log_files[removed]
-
-    for added in TEST_ROOM_IDS - clients.keys():
-        client = None
-        try:
-            client = asyncio.run(connect_room(added))
-        except Exception as e:
-            if client is not None:
-                asyncio.run(client.stop_and_close())
-
-
 def init_session():
     cookies = http.cookies.SimpleCookie()
     cookies["SESSDATA"] = SESSDATA
@@ -185,6 +152,33 @@ class JsonlLogger(blivedm.BaseHandler):
         log_files[client.room_id].write(f"{json.dumps(message, separators=(',', ':'), ensure_ascii=False)}\n")
 
 
+# set log level to INFO
+logging.basicConfig(level=logging.INFO)
+
+try:
+    with open("config.json") as conf_f:
+        conf = json.load(conf_f)
+except FileNotFoundError:
+    conf = {}
+redis_conf: dict | None = conf.get("redis", None)
+
+os.makedirs("logs", exist_ok=True)
+room_status_log = open("logs/room_status.jsonl", "a", encoding="utf-8")
+
+logger = logging.getLogger()
+
+clients = {}
+log_files = {}
+
+with open("cookie.txt", "r", encoding="utf-8") as f:
+    SESSDATA = f.read().strip()
+
+session: Optional[aiohttp.ClientSession] = None
+
+TEST_ROOM_IDS: list[int] = room_from_config()
+
+signal.signal(signal.SIGHUP, handle_sighup)
+
 jsonl_logger = JsonlLogger()
 
 if __name__ == "__main__":
diff --git a/redis_util.py b/redis_util.py
new file mode 100644
index 0000000..c57ffd1
--- /dev/null
+++ b/redis_util.py
@@ -0,0 +1,43 @@
+import threading
+from typing import Callable
+
+import redis
+
+room_list_key = "bilibili.live.danmu.room_list"
+r: redis.Redis | None = None
+on_room_changed: Callable[[list[int]], None] = lambda ignore: None
+
+
+def init(redis_conf: dict | None):
+    global r
+
+    if redis_conf is None:
+        return
+
+    r = redis.Redis(
+        host=redis_conf.get("host", "127.0.0.1"),
+        port=redis_conf.get("port", 6379),
+        db=redis_conf.get("port", 0),
+    )
+
+    # run subscribe_redis on new thread
+    threading.Thread(target=subscribe_redis, daemon=True).start()
+
+
+def get_room_list() -> list[int]:
+    if r is None:
+        return []
+
+    return [int(room_id) for room_id in set(r.lrange(room_list_key, 0, -1))]
+
+
+def subscribe_redis():
+    p = r.pubsub()
+    p.psubscribe(**{f"__keyspace@0__:{room_list_key}": room_changed_handler})
+    for message in p.listen():
+        pass
+
+
+# noinspection PyUnusedLocal
+def room_changed_handler(ignore):
+    on_room_changed(get_room_list())