diff --git a/db.py b/db.py new file mode 100644 index 0000000..9db4fff --- /dev/null +++ b/db.py @@ -0,0 +1,35 @@ + +import mysql.connector +import schedule + +class Room: + def __init__(self, room_id, liver_uid, liver_name): + self.room_id = room_id + self.liver_uid = liver_uid + self.liver_name = liver_name + + +rooms = {} + +connection = mysql.connector.connect( + host="mysql", + database="live_log", + user="live_log", + password="!36z@Jd6LM@uxktq", +) + + +def flush_room_info(): + with connection.cursor() as cursor: + cursor.execute( + "SELECT room_id, liver_uid, liver_name FROM liver_current_name") + for room_id, liver_uid, liver_name in cursor.fetchall(): + rooms[room_id] = Room(room_id, liver_uid, liver_name) + + +def get_room(room_id) -> Room: + return rooms.get(room_id, Room(room_id, 0, str(room_id))) + + +flush_room_info() +schedule.every(1).days.do(flush_room_info) diff --git a/live_status_ntf.py b/live_status_ntf.py new file mode 100644 index 0000000..5949d67 --- /dev/null +++ b/live_status_ntf.py @@ -0,0 +1,133 @@ +import os +import time +import requests +import schedule +from watchdog.observers import Observer +from watchdog.events import FileSystemEventHandler +import json +import db +import re + +room_id_re = re.compile(r"\/(\d+).jsonl") + + +class Room: + positon = 0 + # 0 unkonwn, 1 live, 2 pending + state = 0 + state_changed = False + + def __init__(self, room_id) -> None: + self.room_id = room_id + self.db_room = db.get_room(room_id) + + def reset(self): + self.position = 0 + self.state = 0 + self.state_changed = False + + def update_state(self, state): + self.state = state + self.state_changed = True + + +_rooms: dict[str, Room] = {} + + +def get_room(src_path): + room_id = room_id_re.findall(src_path)[0] + return _rooms.get(room_id, Room(room_id)) + + +class MyHandler(FileSystemEventHandler): + def on_modified(self, event): + if event.is_directory: + return + + if not event.src_path.endswith(".jsonl"): + return + + room: Room = get_room(event.src_path) + + with open(event.src_path, "r", encoding="utf-8") as f: + + if room.position > os.path.getsize(event.src_path): + room.reset() + f.seek(room.position) + + room.position = os.path.getsize(event.src_path) + + for line in f: + data = json.loads(line) + self._handle_data(room, data) + + self._check_room_state_change(room) + + def on_created(self, event): + if event.is_directory: + return + + if not event.src_path.endswith(".jsonl"): + return + + room: Room = get_room(event.src_path) + room.position = os.path.getsize(event.src_path) + + with open(event.src_path, "r", encoding="utf-8") as f: + for line in f: + data = json.loads(line) + self._handle_data(data) + + self._check_room_state_change(room) + + def on_deleted(self, event): + if event.is_directory: + return + + if not event.src_path.endswith(".jsonl"): + return + + room = _rooms.get(event.src_path, Room()) + room.reset() + + def _handle_data(self, room: Room, data): + if data["cmd"] == "LIVE": + if room.state == 1: + return + + room.update_state(1) + + elif data["cmd"] == "PREPARING": + if room.state == 2: + return + + room.update_state(2) + + def _check_room_state_change(self, room: Room): + if not room.state_changed: + return + + room.state_changed = False + + if room.state == 1: + msg = f"{room.db_room.liver_name} 开锅了!" + requests.post("http://turntf:18846/notify", json={"msg": msg}) + + elif room.state == 2: + msg = f"{room.db_room.liver_name} 下锅了!" + requests.post("http://turntf:18846/notify", json={"msg": msg}) + + +if __name__ == "__main__": + path = "logs" + event_handler = MyHandler() + observer = Observer() + observer.schedule(event_handler, path, recursive=True) + observer.start() + try: + while True: + schedule.run_pending() + time.sleep(1) + except KeyboardInterrupt: + observer.stop() + observer.join() diff --git a/requirements.txt b/requirements.txt index ec85237..1955ef7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,7 @@ Brotli~=1.1.0 yarl~=1.9.3 redis~=5.0.2 PyYAML~=6.0.1 +watchdog~=5.0.3 +requests~=2.32.3 +mysql-connector-python~=9.0.0 +schedule~=1.2.2