From 8cdb43d8ffd2905378b050610d0b28e5c563bd19 Mon Sep 17 00:00:00 2001 From: Cam <yulinfeng000@gmail.com> Date: Fri, 7 Jan 2022 10:29:24 +0800 Subject: [PATCH] add middleware --- app.py | 2 +- blive/framework.py | 63 +++++++++++++++++++++++++++++++++++++++------- 2 files changed, 55 insertions(+), 10 deletions(-) diff --git a/app.py b/app.py index 29e8cb0..651364d 100644 --- a/app.py +++ b/app.py @@ -1,7 +1,7 @@ from blive import BLiver, Events, BLiverCtx from blive.msg import DanMuMsg, HotRankChangeV2Msg, InteractWordMsg, SendGiftMsg -app = BLiver(7777, log_level="DEBUG") +app = BLiver(22820500) @app.on(Events.DANMU_MSG) diff --git a/blive/framework.py b/blive/framework.py index 35aa33d..8092b7c 100644 --- a/blive/framework.py +++ b/blive/framework.py @@ -1,7 +1,8 @@ import sys import json import asyncio -from typing import Dict, List, Tuple, Union +from abc import ABC, abstractmethod +from typing import Awaitable, Dict, List, Tuple, Union import aiohttp from aiohttp.client_ws import ClientWebSocketResponse from aiohttp.http_websocket import WSMessage @@ -32,24 +33,61 @@ class BLiverCtx(object): self.body: Dict = None # 消息内容 +class Middleware(ABC): + def warp(self, func): + self.__func = func + return self + + async def __call__(self, ctx: BLiverCtx): + return await self.proxy(ctx, self.__func) + + @abstractmethod + async def proxy(self, ctx: BLiverCtx, func: Awaitable): + raise NotImplementedError + + @abstractmethod + def register_self(self, app: "BLiver"): + raise NotImplementedError + # app.register_middleware(Events.DANMU_MSG,self) + + +class Channel: + def __init__(self) -> None: + self.listeners: List[Union[Middleware, Awaitable]] = [] + self.middlewares: List[Middleware] = [] + + def register_handler(self, handler): + self.listeners.append(handler) + + def apply_middleware(self, middleware: Middleware): + self.listeners = [middleware.warp(h) for h in self.listeners] + + def __iter__(self): + return iter(self.listeners) + + class Processor: def __init__(self, logger=None) -> None: self.logger = logger or loguru.logger - self.channels = {} + self.channels: Dict[str, Channel] = {} for e in Events: - self.channels[e] = [] + self.channels[e] = Channel() - def register(self, channel, handler): - handlers = self.channels.get(channel, None) - handlers.append(handler) + def register(self, channel: str, handler: Awaitable): + channel = self.channels.get(channel, None) + channel.register_handler(handler) + + def apply_middleware(self, channel, middleware): + channel = self.channels.get(channel, None) + channel.apply_middleware(middleware) async def process(self, ctx): header: PackageHeader = ctx.msg[0] msg = json.loads(ctx.msg[1]) ctx.body = msg if header.operation == Operation.NOTIFY: - handlers = self.channels.get(msg["cmd"], []) # 根据cmd 得到相应的处理句柄 - await asyncio.gather(*[c(ctx) for c in handlers]) + listeners = self.channels.get(msg["cmd"], []) # 根据cmd 得到相应的处理句柄 + return await asyncio.gather(*[f(ctx) for f in listeners]) class BLiver: @@ -65,6 +103,10 @@ class BLiver: self._ws: ClientWebSocketResponse = None self.scheduler = AsyncIOScheduler(timezone="Asia/ShangHai") self.processor = Processor(logger=self.logger) + self.middlewares = [] + + def register_middleware(self, channel: str, middleware: Middleware): + self.middlewares.append((channel, middleware)) def on(self, event: Union[Events, List[Events]]): def f_wrapper(func): @@ -126,7 +168,10 @@ class BLiver: self.logger.debug("heartbeat sended") async def listen(self): - + # apply middleware + for mw in self.middlewares: + self.processor.apply_middleware(mw[0], mw[1]) + # start listening url, token = get_blive_ws_url(self.real_roomid) async with aiohttp.ClientSession().ws_connect(url) as ws: self._ws = ws