remove middleware for bad impl and update README

This commit is contained in:
Cam 2022-01-08 14:52:18 +08:00
parent 14c641528a
commit 7812c2fe66
2 changed files with 12 additions and 40 deletions

View File

@ -65,4 +65,4 @@
- 打包发布
- 更多的消息操作类
- ~~尝试加入中间件架构~~ 已经加入
- 尝试加入中间件架构

View File

@ -1,8 +1,6 @@
import sys
import json
import asyncio
from contextvars import ContextVar
from abc import ABC, abstractmethod
from typing import Awaitable, Dict, List, Tuple, Union
import aiohttp
from aiohttp.client_ws import ClientWebSocketResponse
@ -34,35 +32,17 @@ class BLiverCtx(object):
self.body: Dict = None # 消息内容
class Middleware(ABC):
def warp(self, func):
self.__func = func
return self
async def __call__(self, ctx: BLiverCtx):
return await self.proxy(ctx, self.__func)
@abstractmethod
async def proxy(self, ctx: BLiverCtx, func: Awaitable):
raise NotImplementedError
@abstractmethod
def register_self(self, app: "BLiver"):
raise NotImplementedError
# app.register_middleware(Events.DANMU_MSG,self)
class Channel:
"""
消息类型,不直接用list代替的原因是方便后面加入middleware类
"""
def __init__(self) -> None:
self.listeners: List[Union[Middleware, Awaitable]] = []
self.middlewares: List[Middleware] = []
self.listeners = []
def register_handler(self, handler):
self.listeners.append(handler)
def apply_middleware(self, middleware: Middleware):
self.listeners = [middleware.warp(h) for h in self.listeners]
def __iter__(self):
return iter(self.listeners)
@ -75,18 +55,14 @@ class Processor:
self.channels[e] = Channel()
def register(self, channel: str, handler: Awaitable):
channel = self.channels.get(channel, None)
channel = self.channels[channel]
channel.register_handler(handler)
def apply_middleware(self, channel, middleware):
channel = self.channels.get(channel, None)
channel.apply_middleware(middleware)
async def process(self, ctx):
header: PackageHeader = ctx.msg[0]
msg = json.loads(ctx.msg[1])
ctx.body = msg
if header.operation == Operation.NOTIFY:
msg = json.loads(ctx.msg[1])
ctx.body = msg
listeners = self.channels.get(msg["cmd"], []) # 根据cmd 得到相应的处理句柄
return await asyncio.gather(*[f(ctx) for f in listeners])
@ -104,10 +80,6 @@ class BLiver:
self._ws: ClientWebSocketResponse = None
self.scheduler = AsyncIOScheduler(timezone="Asia/ShangHai")
self.processor = Processor(logger=self.logger)
self.middlewares = []
def register_middleware(self, channel: str, middleware: Middleware):
self.middlewares.append((channel, middleware))
def on(self, event: Union[Events, List[Events]]):
def f_wrapper(func):
@ -169,9 +141,6 @@ class BLiver:
self.logger.debug("heartbeat sended")
async def listen(self):
# apply middleware
for mw in self.middlewares:
self.processor.apply_middleware(mw[0], mw[1])
# start listening
url, token = get_blive_ws_url(self.real_roomid)
async with aiohttp.ClientSession().ws_connect(url) as ws:
@ -179,8 +148,11 @@ class BLiver:
await ws.send_bytes(
packman.pack(certification(self.real_roomid, token), Operation.AUTH)
)
# 开始30s发送心跳包的定时任务
self.scheduler.add_job(self.heartbeat, trigger="interval", seconds=30)
self.scheduler.start()
# 开始监听
while True:
msg: WSMessage = await ws.receive()