mirror of
https://github.com/yulinfeng000/blive.git
synced 2025-03-25 16:50:53 +08:00
add middleware
This commit is contained in:
parent
3ec28742a5
commit
8cdb43d8ff
2
app.py
2
app.py
@ -1,7 +1,7 @@
|
||||
from blive import BLiver, Events, BLiverCtx
|
||||
from blive.msg import DanMuMsg, HotRankChangeV2Msg, InteractWordMsg, SendGiftMsg
|
||||
|
||||
app = BLiver(7777, log_level="DEBUG")
|
||||
app = BLiver(22820500)
|
||||
|
||||
|
||||
@app.on(Events.DANMU_MSG)
|
||||
|
@ -1,7 +1,8 @@
|
||||
import sys
|
||||
import json
|
||||
import asyncio
|
||||
from typing import Dict, List, Tuple, Union
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Awaitable, Dict, List, Tuple, Union
|
||||
import aiohttp
|
||||
from aiohttp.client_ws import ClientWebSocketResponse
|
||||
from aiohttp.http_websocket import WSMessage
|
||||
@ -32,24 +33,61 @@ class BLiverCtx(object):
|
||||
self.body: Dict = None # 消息内容
|
||||
|
||||
|
||||
class Middleware(ABC):
|
||||
def warp(self, func):
|
||||
self.__func = func
|
||||
return self
|
||||
|
||||
async def __call__(self, ctx: BLiverCtx):
|
||||
return await self.proxy(ctx, self.__func)
|
||||
|
||||
@abstractmethod
|
||||
async def proxy(self, ctx: BLiverCtx, func: Awaitable):
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def register_self(self, app: "BLiver"):
|
||||
raise NotImplementedError
|
||||
# app.register_middleware(Events.DANMU_MSG,self)
|
||||
|
||||
|
||||
class Channel:
|
||||
def __init__(self) -> None:
|
||||
self.listeners: List[Union[Middleware, Awaitable]] = []
|
||||
self.middlewares: List[Middleware] = []
|
||||
|
||||
def register_handler(self, handler):
|
||||
self.listeners.append(handler)
|
||||
|
||||
def apply_middleware(self, middleware: Middleware):
|
||||
self.listeners = [middleware.warp(h) for h in self.listeners]
|
||||
|
||||
def __iter__(self):
|
||||
return iter(self.listeners)
|
||||
|
||||
|
||||
class Processor:
|
||||
def __init__(self, logger=None) -> None:
|
||||
self.logger = logger or loguru.logger
|
||||
self.channels = {}
|
||||
self.channels: Dict[str, Channel] = {}
|
||||
for e in Events:
|
||||
self.channels[e] = []
|
||||
self.channels[e] = Channel()
|
||||
|
||||
def register(self, channel, handler):
|
||||
handlers = self.channels.get(channel, None)
|
||||
handlers.append(handler)
|
||||
def register(self, channel: str, handler: Awaitable):
|
||||
channel = self.channels.get(channel, None)
|
||||
channel.register_handler(handler)
|
||||
|
||||
def apply_middleware(self, channel, middleware):
|
||||
channel = self.channels.get(channel, None)
|
||||
channel.apply_middleware(middleware)
|
||||
|
||||
async def process(self, ctx):
|
||||
header: PackageHeader = ctx.msg[0]
|
||||
msg = json.loads(ctx.msg[1])
|
||||
ctx.body = msg
|
||||
if header.operation == Operation.NOTIFY:
|
||||
handlers = self.channels.get(msg["cmd"], []) # 根据cmd 得到相应的处理句柄
|
||||
await asyncio.gather(*[c(ctx) for c in handlers])
|
||||
listeners = self.channels.get(msg["cmd"], []) # 根据cmd 得到相应的处理句柄
|
||||
return await asyncio.gather(*[f(ctx) for f in listeners])
|
||||
|
||||
|
||||
class BLiver:
|
||||
@ -65,6 +103,10 @@ class BLiver:
|
||||
self._ws: ClientWebSocketResponse = None
|
||||
self.scheduler = AsyncIOScheduler(timezone="Asia/ShangHai")
|
||||
self.processor = Processor(logger=self.logger)
|
||||
self.middlewares = []
|
||||
|
||||
def register_middleware(self, channel: str, middleware: Middleware):
|
||||
self.middlewares.append((channel, middleware))
|
||||
|
||||
def on(self, event: Union[Events, List[Events]]):
|
||||
def f_wrapper(func):
|
||||
@ -126,7 +168,10 @@ class BLiver:
|
||||
self.logger.debug("heartbeat sended")
|
||||
|
||||
async def listen(self):
|
||||
|
||||
# apply middleware
|
||||
for mw in self.middlewares:
|
||||
self.processor.apply_middleware(mw[0], mw[1])
|
||||
# start listening
|
||||
url, token = get_blive_ws_url(self.real_roomid)
|
||||
async with aiohttp.ClientSession().ws_connect(url) as ws:
|
||||
self._ws = ws
|
||||
|
Loading…
Reference in New Issue
Block a user