blivedm/sample.py

138 lines
3.7 KiB
Python

# -*- coding: utf-8 -*-
import asyncio
import http.cookies
import json
import logging
import os
import threading
from typing import *
import aiohttp
import redis
import blivedm
from blivedm.clients import ws_base
# set log level to INFO
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()
room_list_key = "bilibili.live.danmu.room_list"
r = redis.Redis(host='redis', port=6379, db=0)
TEST_ROOM_IDS = [int(room_id) for room_id in set(r.lrange(room_list_key, 0, -1))]
clients = {}
log_files = {}
with open("cookie.txt", "r", encoding="utf-8") as f:
SESSDATA = f.read().strip()
session: Optional[aiohttp.ClientSession] = None
async def main():
flush_files()
init_session()
try:
for room_id in TEST_ROOM_IDS:
client = None
try:
logger.info("connect to room %s", room_id)
client = blivedm.BLiveClient(room_id, session=session)
client.set_handler(jsonl_logger)
client.start()
clients[room_id] = client
logger.info("connect to room %s success", room_id)
except Exception as e:
if client is not None:
await client.stop_and_close()
raise e
# run subscribe_redis on new thread
threading.Thread(target=subscribe_redis, daemon=True).start()
while True:
await asyncio.gather(*(
clients[room_id].join() for room_id in clients
))
await asyncio.sleep(1)
finally:
for room_id in clients:
await clients[room_id].stop_and_close()
await session.close()
def flush_files():
for room_id in log_files:
log_files[room_id].flush()
timer = threading.Timer(1, flush_files)
timer.daemon = True
timer.start()
def subscribe_redis():
p = r.pubsub()
p.psubscribe(**{f"__keyspace@0__:{room_list_key}": room_changed_handler})
while True:
message = p.get_message()
if message:
print(message)
# noinspection PyUnusedLocal
def room_changed_handler(message):
global TEST_ROOM_IDS
TEST_ROOM_IDS = set(int(room_id) for room_id in set(r.lrange(room_list_key, 0, -1)))
for removed in clients.keys() - TEST_ROOM_IDS:
asyncio.run(clients[removed].stop_and_close())
del clients[removed]
if removed in log_files:
log_files[removed].close()
del log_files[removed]
for added in TEST_ROOM_IDS - clients.keys():
client = None
try:
logger.info("connect to room %s", added)
client = blivedm.BLiveClient(added, session=session)
client.set_handler(jsonl_logger)
client.start()
clients[added] = client
logger.info("connect to room %s success", added)
except Exception as e:
if client is not None:
asyncio.run(client.stop_and_close())
raise e
def init_session():
cookies = http.cookies.SimpleCookie()
cookies['SESSDATA'] = SESSDATA
cookies['SESSDATA']['domain'] = 'bilibili.com'
global session
session = aiohttp.ClientSession()
session.cookie_jar.update_cookies(cookies)
class JsonlLogger(blivedm.BaseHandler):
def _global_callback(self, client: ws_base.WebSocketClientBase, message: dict):
if client.room_id not in log_files:
os.makedirs("logs", exist_ok=True)
log_files[client.room_id] = open(f"logs/{client.room_id}.jsonl", "a", encoding="utf-8")
log_files[client.room_id].write(f"{json.dumps(message, separators=(',', ':'))}\n")
jsonl_logger = JsonlLogger()
if __name__ == '__main__':
asyncio.run(main())