记录直播原始数据,添加redis控制的直播间列表

This commit is contained in:
tursom 2024-03-01 14:48:55 +08:00
parent fc55b75dab
commit c7eea9c6d9
4 changed files with 102 additions and 78 deletions

3
.gitignore vendored
View File

@ -100,3 +100,6 @@ ENV/
.idea/ .idea/
cookie.txt
logs

View File

@ -58,6 +58,7 @@ def _make_msg_callback(method_name, message_cls):
def callback(self: 'BaseHandler', client: ws_base.WebSocketClientBase, command: dict): def callback(self: 'BaseHandler', client: ws_base.WebSocketClientBase, command: dict):
method = getattr(self, method_name) method = getattr(self, method_name)
return method(client, message_cls.from_command(command['data'])) return method(client, message_cls.from_command(command['data']))
return callback return callback
@ -128,6 +129,11 @@ class BaseHandler(HandlerInterface):
if callback is not None: if callback is not None:
callback(self, client, command) callback(self, client, command)
self._global_callback(client, command)
def _global_callback(self, client: ws_base.WebSocketClientBase, message: dict):
pass
def _on_heartbeat(self, client: ws_base.WebSocketClientBase, message: web_models.HeartbeatMessage): def _on_heartbeat(self, client: ws_base.WebSocketClientBase, message: web_models.HeartbeatMessage):
""" """
收到心跳包 收到心跳包
@ -154,7 +160,7 @@ class BaseHandler(HandlerInterface):
""" """
def _on_super_chat_delete( def _on_super_chat_delete(
self, client: ws_base.WebSocketClientBase, message: web_models.SuperChatDeleteMessage self, client: ws_base.WebSocketClientBase, message: web_models.SuperChatDeleteMessage
): ):
""" """
删除醒目留言 删除醒目留言
@ -180,14 +186,14 @@ class BaseHandler(HandlerInterface):
""" """
def _on_open_live_super_chat( def _on_open_live_super_chat(
self, client: ws_base.WebSocketClientBase, message: open_models.SuperChatMessage self, client: ws_base.WebSocketClientBase, message: open_models.SuperChatMessage
): ):
""" """
醒目留言 醒目留言
""" """
def _on_open_live_super_chat_delete( def _on_open_live_super_chat_delete(
self, client: ws_base.WebSocketClientBase, message: open_models.SuperChatDeleteMessage self, client: ws_base.WebSocketClientBase, message: open_models.SuperChatDeleteMessage
): ):
""" """
删除醒目留言 删除醒目留言

View File

@ -1,3 +1,4 @@
aiohttp~=3.9.0 aiohttp~=3.9.0
Brotli~=1.1.0 Brotli~=1.1.0
yarl~=1.9.3 yarl~=1.9.3
redis~=5.0.2

164
sample.py
View File

@ -1,25 +1,33 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import asyncio import asyncio
import http.cookies import http.cookies
import json
import logging
import os
import random import random
import threading
from typing import * from typing import *
import aiohttp import aiohttp
import redis
import blivedm import blivedm
import blivedm.models.web as web_models from blivedm.clients import ws_base
# 直播间ID的取值看直播间URL # set log level to INFO
TEST_ROOM_IDS = [ logging.basicConfig(level=logging.INFO)
12235923,
14327465,
21396545,
21449083,
23105590,
]
# 这里填一个已登录账号的cookie。不填cookie也可以连接但是收到弹幕的用户名会打码UID会变成0 logger = logging.getLogger()
SESSDATA = ''
room_list_key = "bilibili.live.danmu.room_list"
r = redis.Redis(host='redis', port=6379, db=0)
TEST_ROOM_IDS = [int(room_id) for room_id in set(r.lrange(room_list_key, 0, -1))]
clients = {}
log_files = {}
with open("cookie.txt", "r", encoding="utf-8") as f:
SESSDATA = f.read().strip()
session: Optional[aiohttp.ClientSession] = None session: Optional[aiohttp.ClientSession] = None
@ -27,12 +35,72 @@ session: Optional[aiohttp.ClientSession] = None
async def main(): async def main():
init_session() init_session()
try: try:
await run_single_client() for room_id in TEST_ROOM_IDS:
await run_multi_clients() client = None
try:
logger.info("connect to room %s", room_id)
client = blivedm.BLiveClient(room_id, session=session)
client.set_handler(jsonl_logger)
client.start()
clients[room_id] = client
logger.info("connect to room %s success", room_id)
except Exception as e:
if client is not None:
await client.stop_and_close()
raise e
# run subscribe_redis on new thread
threading.Thread(target=subscribe_redis).start()
while True:
await asyncio.gather(*(
clients[room_id].join() for room_id in clients
))
finally: finally:
for room_id in clients:
await clients[room_id].stop_and_close()
await session.close() await session.close()
def subscribe_redis():
p = r.pubsub()
p.psubscribe(**{f"__keyspace@0__:{room_list_key}": room_changed_handler})
while True:
message = p.get_message()
if message:
print(message)
# noinspection PyUnusedLocal
def room_changed_handler(message):
global TEST_ROOM_IDS
TEST_ROOM_IDS = set(int(room_id) for room_id in set(r.lrange(room_list_key, 0, -1)))
for removed in clients.keys() - TEST_ROOM_IDS:
asyncio.run(clients[removed].stop_and_close())
del clients[removed]
if removed in log_files:
log_files[removed].close()
del log_files[removed]
for added in TEST_ROOM_IDS - clients.keys():
client = None
try:
logger.info("connect to room %s", added)
client = blivedm.BLiveClient(added, session=session)
client.set_handler(jsonl_logger)
client.start()
clients[added] = client
logger.info("connect to room %s success", added)
except Exception as e:
if client is not None:
asyncio.run(client.stop_and_close())
raise e
def init_session(): def init_session():
cookies = http.cookies.SimpleCookie() cookies = http.cookies.SimpleCookie()
cookies['SESSDATA'] = SESSDATA cookies['SESSDATA'] = SESSDATA
@ -43,72 +111,18 @@ def init_session():
session.cookie_jar.update_cookies(cookies) session.cookie_jar.update_cookies(cookies)
async def run_single_client(): class JsonlLogger(blivedm.BaseHandler):
""" def _global_callback(self, client: ws_base.WebSocketClientBase, message: dict):
演示监听一个直播间 if client.room_id not in log_files:
""" # call mkdirs
room_id = random.choice(TEST_ROOM_IDS) os.makedirs("logs", exist_ok=True)
client = blivedm.BLiveClient(room_id, session=session)
handler = MyHandler()
client.set_handler(handler)
client.start() log_files[client.room_id] = open(f"logs/{client.room_id}.jsonl", "a", encoding="utf-8")
try:
# 演示5秒后停止
await asyncio.sleep(5)
client.stop()
await client.join() log_files[client.room_id].write(f"{json.dumps(message, separators=(',', ':'))}\n")
finally:
await client.stop_and_close()
async def run_multi_clients(): jsonl_logger = JsonlLogger()
"""
演示同时监听多个直播间
"""
clients = [blivedm.BLiveClient(room_id, session=session) for room_id in TEST_ROOM_IDS]
handler = MyHandler()
for client in clients:
client.set_handler(handler)
client.start()
try:
await asyncio.gather(*(
client.join() for client in clients
))
finally:
await asyncio.gather(*(
client.stop_and_close() for client in clients
))
class MyHandler(blivedm.BaseHandler):
# # 演示如何添加自定义回调
# _CMD_CALLBACK_DICT = blivedm.BaseHandler._CMD_CALLBACK_DICT.copy()
#
# # 入场消息回调
# def __interact_word_callback(self, client: blivedm.BLiveClient, command: dict):
# print(f"[{client.room_id}] INTERACT_WORD: self_type={type(self).__name__}, room_id={client.room_id},"
# f" uname={command['data']['uname']}")
# _CMD_CALLBACK_DICT['INTERACT_WORD'] = __interact_word_callback # noqa
def _on_heartbeat(self, client: blivedm.BLiveClient, message: web_models.HeartbeatMessage):
print(f'[{client.room_id}] 心跳')
def _on_danmaku(self, client: blivedm.BLiveClient, message: web_models.DanmakuMessage):
print(f'[{client.room_id}] {message.uname}{message.msg}')
def _on_gift(self, client: blivedm.BLiveClient, message: web_models.GiftMessage):
print(f'[{client.room_id}] {message.uname} 赠送{message.gift_name}x{message.num}'
f' {message.coin_type}瓜子x{message.total_coin}')
def _on_buy_guard(self, client: blivedm.BLiveClient, message: web_models.GuardBuyMessage):
print(f'[{client.room_id}] {message.username} 购买{message.gift_name}')
def _on_super_chat(self, client: blivedm.BLiveClient, message: web_models.SuperChatMessage):
print(f'[{client.room_id}] 醒目留言 ¥{message.price} {message.uname}{message.message}')
if __name__ == '__main__': if __name__ == '__main__':
asyncio.run(main()) asyncio.run(main())