From c7eea9c6d94387f4982a12005f8f505c6598732a Mon Sep 17 00:00:00 2001 From: tursom Date: Fri, 1 Mar 2024 14:48:55 +0800 Subject: [PATCH] =?UTF-8?q?=E8=AE=B0=E5=BD=95=E7=9B=B4=E6=92=AD=E5=8E=9F?= =?UTF-8?q?=E5=A7=8B=E6=95=B0=E6=8D=AE=EF=BC=8C=E6=B7=BB=E5=8A=A0redis?= =?UTF-8?q?=E6=8E=A7=E5=88=B6=E7=9A=84=E7=9B=B4=E6=92=AD=E9=97=B4=E5=88=97?= =?UTF-8?q?=E8=A1=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 3 + blivedm/handlers.py | 12 +++- requirements.txt | 1 + sample.py | 164 ++++++++++++++++++++++++-------------------- 4 files changed, 102 insertions(+), 78 deletions(-) diff --git a/.gitignore b/.gitignore index 101adc4..d3d675d 100644 --- a/.gitignore +++ b/.gitignore @@ -100,3 +100,6 @@ ENV/ .idea/ + +cookie.txt +logs diff --git a/blivedm/handlers.py b/blivedm/handlers.py index 76caca6..81c4301 100644 --- a/blivedm/handlers.py +++ b/blivedm/handlers.py @@ -58,6 +58,7 @@ def _make_msg_callback(method_name, message_cls): def callback(self: 'BaseHandler', client: ws_base.WebSocketClientBase, command: dict): method = getattr(self, method_name) return method(client, message_cls.from_command(command['data'])) + return callback @@ -128,6 +129,11 @@ class BaseHandler(HandlerInterface): if callback is not None: callback(self, client, command) + self._global_callback(client, command) + + def _global_callback(self, client: ws_base.WebSocketClientBase, message: dict): + pass + def _on_heartbeat(self, client: ws_base.WebSocketClientBase, message: web_models.HeartbeatMessage): """ 收到心跳包 @@ -154,7 +160,7 @@ class BaseHandler(HandlerInterface): """ def _on_super_chat_delete( - self, client: ws_base.WebSocketClientBase, message: web_models.SuperChatDeleteMessage + self, client: ws_base.WebSocketClientBase, message: web_models.SuperChatDeleteMessage ): """ 删除醒目留言 @@ -180,14 +186,14 @@ class BaseHandler(HandlerInterface): """ def _on_open_live_super_chat( - self, client: ws_base.WebSocketClientBase, message: open_models.SuperChatMessage + self, client: ws_base.WebSocketClientBase, message: open_models.SuperChatMessage ): """ 醒目留言 """ def _on_open_live_super_chat_delete( - self, client: ws_base.WebSocketClientBase, message: open_models.SuperChatDeleteMessage + self, client: ws_base.WebSocketClientBase, message: open_models.SuperChatDeleteMessage ): """ 删除醒目留言 diff --git a/requirements.txt b/requirements.txt index a0e5dc3..e0bfadb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ aiohttp~=3.9.0 Brotli~=1.1.0 yarl~=1.9.3 +redis~=5.0.2 diff --git a/sample.py b/sample.py index d54ef15..d953303 100644 --- a/sample.py +++ b/sample.py @@ -1,25 +1,33 @@ # -*- coding: utf-8 -*- import asyncio import http.cookies +import json +import logging +import os import random +import threading from typing import * import aiohttp +import redis import blivedm -import blivedm.models.web as web_models +from blivedm.clients import ws_base -# 直播间ID的取值看直播间URL -TEST_ROOM_IDS = [ - 12235923, - 14327465, - 21396545, - 21449083, - 23105590, -] +# set log level to INFO +logging.basicConfig(level=logging.INFO) -# 这里填一个已登录账号的cookie。不填cookie也可以连接,但是收到弹幕的用户名会打码,UID会变成0 -SESSDATA = '' +logger = logging.getLogger() + +room_list_key = "bilibili.live.danmu.room_list" +r = redis.Redis(host='redis', port=6379, db=0) + +TEST_ROOM_IDS = [int(room_id) for room_id in set(r.lrange(room_list_key, 0, -1))] +clients = {} +log_files = {} + +with open("cookie.txt", "r", encoding="utf-8") as f: + SESSDATA = f.read().strip() session: Optional[aiohttp.ClientSession] = None @@ -27,12 +35,72 @@ session: Optional[aiohttp.ClientSession] = None async def main(): init_session() try: - await run_single_client() - await run_multi_clients() + for room_id in TEST_ROOM_IDS: + client = None + try: + logger.info("connect to room %s", room_id) + client = blivedm.BLiveClient(room_id, session=session) + client.set_handler(jsonl_logger) + client.start() + clients[room_id] = client + logger.info("connect to room %s success", room_id) + except Exception as e: + if client is not None: + await client.stop_and_close() + + raise e + + # run subscribe_redis on new thread + threading.Thread(target=subscribe_redis).start() + + while True: + await asyncio.gather(*( + clients[room_id].join() for room_id in clients + )) finally: + for room_id in clients: + await clients[room_id].stop_and_close() await session.close() +def subscribe_redis(): + p = r.pubsub() + p.psubscribe(**{f"__keyspace@0__:{room_list_key}": room_changed_handler}) + while True: + message = p.get_message() + if message: + print(message) + + +# noinspection PyUnusedLocal +def room_changed_handler(message): + global TEST_ROOM_IDS + TEST_ROOM_IDS = set(int(room_id) for room_id in set(r.lrange(room_list_key, 0, -1))) + + for removed in clients.keys() - TEST_ROOM_IDS: + asyncio.run(clients[removed].stop_and_close()) + del clients[removed] + + if removed in log_files: + log_files[removed].close() + del log_files[removed] + + for added in TEST_ROOM_IDS - clients.keys(): + client = None + try: + logger.info("connect to room %s", added) + client = blivedm.BLiveClient(added, session=session) + client.set_handler(jsonl_logger) + client.start() + clients[added] = client + logger.info("connect to room %s success", added) + except Exception as e: + if client is not None: + asyncio.run(client.stop_and_close()) + + raise e + + def init_session(): cookies = http.cookies.SimpleCookie() cookies['SESSDATA'] = SESSDATA @@ -43,72 +111,18 @@ def init_session(): session.cookie_jar.update_cookies(cookies) -async def run_single_client(): - """ - 演示监听一个直播间 - """ - room_id = random.choice(TEST_ROOM_IDS) - client = blivedm.BLiveClient(room_id, session=session) - handler = MyHandler() - client.set_handler(handler) +class JsonlLogger(blivedm.BaseHandler): + def _global_callback(self, client: ws_base.WebSocketClientBase, message: dict): + if client.room_id not in log_files: + # call mkdirs + os.makedirs("logs", exist_ok=True) - client.start() - try: - # 演示5秒后停止 - await asyncio.sleep(5) - client.stop() + log_files[client.room_id] = open(f"logs/{client.room_id}.jsonl", "a", encoding="utf-8") - await client.join() - finally: - await client.stop_and_close() + log_files[client.room_id].write(f"{json.dumps(message, separators=(',', ':'))}\n") -async def run_multi_clients(): - """ - 演示同时监听多个直播间 - """ - clients = [blivedm.BLiveClient(room_id, session=session) for room_id in TEST_ROOM_IDS] - handler = MyHandler() - for client in clients: - client.set_handler(handler) - client.start() - - try: - await asyncio.gather(*( - client.join() for client in clients - )) - finally: - await asyncio.gather(*( - client.stop_and_close() for client in clients - )) - - -class MyHandler(blivedm.BaseHandler): - # # 演示如何添加自定义回调 - # _CMD_CALLBACK_DICT = blivedm.BaseHandler._CMD_CALLBACK_DICT.copy() - # - # # 入场消息回调 - # def __interact_word_callback(self, client: blivedm.BLiveClient, command: dict): - # print(f"[{client.room_id}] INTERACT_WORD: self_type={type(self).__name__}, room_id={client.room_id}," - # f" uname={command['data']['uname']}") - # _CMD_CALLBACK_DICT['INTERACT_WORD'] = __interact_word_callback # noqa - - def _on_heartbeat(self, client: blivedm.BLiveClient, message: web_models.HeartbeatMessage): - print(f'[{client.room_id}] 心跳') - - def _on_danmaku(self, client: blivedm.BLiveClient, message: web_models.DanmakuMessage): - print(f'[{client.room_id}] {message.uname}:{message.msg}') - - def _on_gift(self, client: blivedm.BLiveClient, message: web_models.GiftMessage): - print(f'[{client.room_id}] {message.uname} 赠送{message.gift_name}x{message.num}' - f' ({message.coin_type}瓜子x{message.total_coin})') - - def _on_buy_guard(self, client: blivedm.BLiveClient, message: web_models.GuardBuyMessage): - print(f'[{client.room_id}] {message.username} 购买{message.gift_name}') - - def _on_super_chat(self, client: blivedm.BLiveClient, message: web_models.SuperChatMessage): - print(f'[{client.room_id}] 醒目留言 ¥{message.price} {message.uname}:{message.message}') - +jsonl_logger = JsonlLogger() if __name__ == '__main__': asyncio.run(main())