blivedm/live_status_ntf.py
2024-10-14 10:49:53 +08:00

166 lines
4.1 KiB
Python

import os
import time
import requests
import schedule
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import json
import db
import re
path = "logs"
room_id_re = re.compile(rf"{path}/(\d+).jsonl$")
class Room:
position = 0
# 0 unkonwn, 1 live, 2 pending
state = 0
state_changed = False
def __init__(self, room_id) -> None:
self.room_id = room_id
self.db_room = db.get_room(room_id)
def reset(self):
self.position = 0
self.state = 0
self.state_changed = False
def update_state(self, state):
self.state = state
self.state_changed = True
_rooms: dict[str, Room] = {}
def get_room(src_path):
room_id = room_id_re.findall(src_path)[0]
room = _rooms.get(room_id, None)
if room is None:
room = Room(room_id)
_rooms[room_id] = room
return room
class MyHandler(FileSystemEventHandler):
def on_modified(self, event):
if event.is_directory:
return
if not room_id_re.match(event.src_path):
return
room: Room = get_room(event.src_path)
try:
with open(event.src_path, "r", encoding="utf-8") as f:
if room.position > os.path.getsize(event.src_path):
room.reset()
f.seek(room.position)
room.position = os.path.getsize(event.src_path)
for line in f:
try:
data = json.loads(line)
except json.decoder.JSONDecodeError:
continue
self._handle_data(room, data)
except UnicodeDecodeError:
pass
self._check_room_state_change(room)
def on_created(self, event):
if event.is_directory:
return
if not room_id_re.match(event.src_path):
return
room: Room = get_room(event.src_path)
room.position = os.path.getsize(event.src_path)
try:
with open(event.src_path, "r", encoding="utf-8") as f:
for line in f:
try:
data = json.loads(line)
except json.decoder.JSONDecodeError:
continue
self._handle_data(data)
except UnicodeDecodeError:
pass
self._check_room_state_change(room)
def on_deleted(self, event):
if event.is_directory:
return
if not room_id_re.match(event.src_path):
return
room = _rooms.get(event.src_path, Room())
room.reset()
def _handle_data(self, room: Room, data):
if data["cmd"] == "LIVE":
if room.state == 1:
return
room.update_state(1)
elif data["cmd"] == "PREPARING":
if room.state == 2:
return
room.update_state(2)
def _check_room_state_change(self, room: Room):
if not room.state_changed:
return
room.state_changed = False
if room.state == 1:
msg = "\n".join([
f"{room.db_room.liver_name} 开锅了!",
f"https://live.bilibili.com/{room.room_id}"
])
requests.post("http://turntf:18846/notify", json={"msg": msg})
elif room.state == 2:
msg = "\n".join([
f"{room.db_room.liver_name} 下锅了!",
f"https://live.bilibili.com/{room.room_id}"
])
requests.post("http://turntf:18846/notify", json={"msg": msg})
def main():
for f in os.listdir(path):
f = path+"/"+str(f)
if not room_id_re.match(f):
continue
get_room(f).position = os.path.getsize(f)
event_handler = MyHandler()
observer = Observer()
observer.schedule(event_handler, path, recursive=True)
observer.start()
try:
while True:
schedule.run_pending()
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
if __name__ == "__main__":
main()