import threading from typing import Callable import redis room_list_key = "bilibili.live.danmu.room_list" r: redis.Redis | None = None on_room_changed: Callable[[list[int]], None] = lambda ignore: None def init(redis_conf: dict | None): global r if redis_conf is None: return r = redis.Redis( host=redis_conf.get("host", "127.0.0.1"), port=redis_conf.get("port", 6379), db=redis_conf.get("port", 0), ) # run subscribe_redis on new thread threading.Thread(target=subscribe_redis, daemon=True).start() def get_room_list() -> list[int]: if r is None: return [] return [int(room_id) for room_id in set(r.lrange(room_list_key, 0, -1))] def subscribe_redis(): p = r.pubsub() p.psubscribe(**{f"__keyspace@0__:{room_list_key}": room_changed_handler}) for message in p.listen(): pass # noinspection PyUnusedLocal def room_changed_handler(ignore): on_room_changed(get_room_list())