diff --git a/README.md b/README.md index 3be3529..58c3dcd 100644 --- a/README.md +++ b/README.md @@ -65,4 +65,4 @@ - 打包发布 - 更多的消息操作类 -- ~~尝试加入中间件架构~~ 已经加入 +- 尝试加入中间件架构 diff --git a/blive/framework.py b/blive/framework.py index 05993bf..da9ca6c 100644 --- a/blive/framework.py +++ b/blive/framework.py @@ -1,8 +1,6 @@ import sys import json import asyncio -from contextvars import ContextVar -from abc import ABC, abstractmethod from typing import Awaitable, Dict, List, Tuple, Union import aiohttp from aiohttp.client_ws import ClientWebSocketResponse @@ -34,35 +32,17 @@ class BLiverCtx(object): self.body: Dict = None # 消息内容 -class Middleware(ABC): - def warp(self, func): - self.__func = func - return self - - async def __call__(self, ctx: BLiverCtx): - return await self.proxy(ctx, self.__func) - - @abstractmethod - async def proxy(self, ctx: BLiverCtx, func: Awaitable): - raise NotImplementedError - - @abstractmethod - def register_self(self, app: "BLiver"): - raise NotImplementedError - # app.register_middleware(Events.DANMU_MSG,self) - - class Channel: + """ + 消息类型,不直接用list代替的原因是方便后面加入middleware类 + """ + def __init__(self) -> None: - self.listeners: List[Union[Middleware, Awaitable]] = [] - self.middlewares: List[Middleware] = [] + self.listeners = [] def register_handler(self, handler): self.listeners.append(handler) - def apply_middleware(self, middleware: Middleware): - self.listeners = [middleware.warp(h) for h in self.listeners] - def __iter__(self): return iter(self.listeners) @@ -75,18 +55,14 @@ class Processor: self.channels[e] = Channel() def register(self, channel: str, handler: Awaitable): - channel = self.channels.get(channel, None) + channel = self.channels[channel] channel.register_handler(handler) - def apply_middleware(self, channel, middleware): - channel = self.channels.get(channel, None) - channel.apply_middleware(middleware) - async def process(self, ctx): header: PackageHeader = ctx.msg[0] - msg = json.loads(ctx.msg[1]) - ctx.body = msg if header.operation == Operation.NOTIFY: + msg = json.loads(ctx.msg[1]) + ctx.body = msg listeners = self.channels.get(msg["cmd"], []) # 根据cmd 得到相应的处理句柄 return await asyncio.gather(*[f(ctx) for f in listeners]) @@ -104,10 +80,6 @@ class BLiver: self._ws: ClientWebSocketResponse = None self.scheduler = AsyncIOScheduler(timezone="Asia/ShangHai") self.processor = Processor(logger=self.logger) - self.middlewares = [] - - def register_middleware(self, channel: str, middleware: Middleware): - self.middlewares.append((channel, middleware)) def on(self, event: Union[Events, List[Events]]): def f_wrapper(func): @@ -169,9 +141,6 @@ class BLiver: self.logger.debug("heartbeat sended") async def listen(self): - # apply middleware - for mw in self.middlewares: - self.processor.apply_middleware(mw[0], mw[1]) # start listening url, token = get_blive_ws_url(self.real_roomid) async with aiohttp.ClientSession().ws_connect(url) as ws: @@ -179,8 +148,11 @@ class BLiver: await ws.send_bytes( packman.pack(certification(self.real_roomid, token), Operation.AUTH) ) + + # 开始30s发送心跳包的定时任务 self.scheduler.add_job(self.heartbeat, trigger="interval", seconds=30) self.scheduler.start() + # 开始监听 while True: msg: WSMessage = await ws.receive()