使用Python 3.8的asyncio特性

This commit is contained in:
John Smith 2023-07-29 12:48:57 +08:00
parent e478959a1b
commit 63f541b87e
8 changed files with 37 additions and 36 deletions

View File

@ -129,21 +129,21 @@ class ChatHandler(tornado.websocket.WebSocketHandler): # noqa
def open(self):
logger.info('client=%s connected', self.request.remote_ip)
self._heartbeat_timer_handle = asyncio.get_event_loop().call_later(
self._heartbeat_timer_handle = asyncio.get_running_loop().call_later(
self.HEARTBEAT_INTERVAL, self._on_send_heartbeat
)
self._refresh_receive_timeout_timer()
def _on_send_heartbeat(self):
self.send_cmd_data(Command.HEARTBEAT, {})
self._heartbeat_timer_handle = asyncio.get_event_loop().call_later(
self._heartbeat_timer_handle = asyncio.get_running_loop().call_later(
self.HEARTBEAT_INTERVAL, self._on_send_heartbeat
)
def _refresh_receive_timeout_timer(self):
if self._receive_timeout_timer_handle is not None:
self._receive_timeout_timer_handle.cancel()
self._receive_timeout_timer_handle = asyncio.get_event_loop().call_later(
self._receive_timeout_timer_handle = asyncio.get_running_loop().call_later(
self.RECEIVE_TIMEOUT, self._on_receive_timeout
)
@ -189,7 +189,7 @@ class ChatHandler(tornado.websocket.WebSocketHandler): # noqa
pass
services.chat.client_room_manager.add_client(self.room_id, self)
asyncio.ensure_future(self._on_joined_room())
asyncio.create_task(self._on_joined_room())
else:
logger.warning('client=%s unknown cmd=%d, body=%s', self.request.remote_ip, cmd, body)

View File

@ -56,7 +56,7 @@ class UploadEmoticonHandler(api.base.ApiHandler): # noqa
if not file.content_type.lower().startswith('image/'):
raise tornado.web.HTTPError(415)
url = await asyncio.get_event_loop().run_in_executor(
url = await asyncio.get_running_loop().run_in_executor(
None, self._save_file, file.body, self.request.remote_ip
)
self.write({

View File

@ -1,6 +1,5 @@
# -*- coding: utf-8 -*-
import argparse
import asyncio
import logging
import logging.handlers
import os
@ -40,7 +39,7 @@ def main():
init_logging(args.debug)
config.init()
asyncio.get_event_loop().run_until_complete(utils.request.init())
utils.request.init()
models.database.init(args.debug)
services.avatar.init()

View File

@ -20,7 +20,6 @@ logger = logging.getLogger(__name__)
DEFAULT_AVATAR_URL = '//static.hdslb.com/images/member/noface.gif'
_main_event_loop = asyncio.get_event_loop()
# user_id -> avatar_url
_avatar_url_cache: Dict[int, str] = {}
# 正在获取头像的Futureuser_id -> Future
@ -43,7 +42,7 @@ def init():
cfg = config.get_config()
global _uid_queue_to_fetch
_uid_queue_to_fetch = asyncio.Queue(cfg.fetch_avatar_max_queue_size)
asyncio.ensure_future(_get_avatar_url_from_web_consumer())
asyncio.get_event_loop().create_task(_get_avatar_url_from_web_consumer())
async def get_avatar_url(user_id):
@ -71,12 +70,11 @@ def get_avatar_url_from_memory(user_id):
def get_avatar_url_from_database(user_id) -> Awaitable[Optional[str]]:
return asyncio.get_event_loop().run_in_executor(
None, _do_get_avatar_url_from_database, user_id
)
loop = asyncio.get_running_loop()
return loop.run_in_executor(None, _do_get_avatar_url_from_database, user_id, loop)
def _do_get_avatar_url_from_database(user_id):
def _do_get_avatar_url_from_database(user_id, loop: asyncio.AbstractEventLoop):
try:
with models.database.get_session() as session:
user = session.scalars(
@ -94,7 +92,7 @@ def _do_get_avatar_url_from_database(user_id):
_avatar_url_cache.pop(user_id, None)
get_avatar_url_from_web(user_id)
_main_event_loop.call_soon(refresh_cache)
loop.call_soon_threadsafe(refresh_cache)
else:
# 否则只更新内存缓存
_update_avatar_cache_in_memory(user_id, avatar_url)
@ -113,7 +111,7 @@ def get_avatar_url_from_web(user_id) -> Awaitable[Optional[str]]:
if future is not None:
return future
# 否则创建一个获取任务
_uid_fetch_future_map[user_id] = future = _main_event_loop.create_future()
_uid_fetch_future_map[user_id] = future = asyncio.get_running_loop().create_future()
future.add_done_callback(lambda _future: _uid_fetch_future_map.pop(user_id, None))
try:
_uid_queue_to_fetch.put_nowait(user_id)
@ -141,7 +139,7 @@ async def _get_avatar_url_from_web_consumer():
else:
_last_fetch_banned_time = None
asyncio.ensure_future(_get_avatar_url_from_web_coroutine(user_id, future))
asyncio.create_task(_get_avatar_url_from_web_coroutine(user_id, future))
# 限制频率防止被B站ban
cfg = config.get_config()
@ -271,7 +269,7 @@ def process_avatar_url(avatar_url):
def update_avatar_cache(user_id, avatar_url):
_update_avatar_cache_in_memory(user_id, avatar_url)
asyncio.get_event_loop().run_in_executor(
asyncio.get_running_loop().run_in_executor(
None, _update_avatar_cache_in_database, user_id, avatar_url
)

View File

@ -39,7 +39,7 @@ class LiveClientManager:
logger.info('room=%d creating live client', room_id)
self._live_clients[room_id] = live_client = LiveClient(room_id)
live_client.add_handler(_live_msg_handler)
asyncio.ensure_future(self._init_live_client(live_client))
asyncio.create_task(self._init_live_client(live_client))
logger.info('room=%d live client created, %d live clients', room_id, len(self._live_clients))
async def _init_live_client(self, live_client: 'LiveClient'):
@ -56,7 +56,7 @@ class LiveClientManager:
return
logger.info('room=%d removing live client', room_id)
live_client.remove_handler(_live_msg_handler)
asyncio.ensure_future(live_client.stop_and_close())
asyncio.create_task(live_client.stop_and_close())
logger.info('room=%d live client removed, %d live clients', room_id, len(self._live_clients))
client_room_manager.del_room(room_id)
@ -130,7 +130,7 @@ class ClientRoomManager:
def delay_del_room(self, room_id, timeout):
self._clear_delay_del_timer(room_id)
self._delay_del_timer_handles[room_id] = asyncio.get_event_loop().call_later(
self._delay_del_timer_handles[room_id] = asyncio.get_running_loop().call_later(
timeout, self._on_delay_del_room, room_id
)
@ -279,7 +279,7 @@ class LiveMsgHandler(blivedm.BaseHandler):
}
async def _on_danmaku(self, client: LiveClient, message: blivedm.DanmakuMessage):
asyncio.ensure_future(self.__on_danmaku(client, message))
asyncio.create_task(self.__on_danmaku(client, message))
async def __on_danmaku(self, client: LiveClient, message: blivedm.DanmakuMessage):
# 先异步调用再获取房间,因为返回时房间可能已经不存在了
@ -364,7 +364,7 @@ class LiveMsgHandler(blivedm.BaseHandler):
})
async def _on_buy_guard(self, client: LiveClient, message: blivedm.GuardBuyMessage):
asyncio.ensure_future(self.__on_buy_guard(client, message))
asyncio.create_task(self.__on_buy_guard(client, message))
@staticmethod
async def __on_buy_guard(client: LiveClient, message: blivedm.GuardBuyMessage):
@ -415,7 +415,7 @@ class LiveMsgHandler(blivedm.BaseHandler):
})
if need_translate:
asyncio.ensure_future(self._translate_and_response(message.message, room.room_id, msg_id))
asyncio.create_task(self._translate_and_response(message.message, room.room_id, msg_id))
async def _on_super_chat_delete(self, client: LiveClient, message: blivedm.SuperChatDeleteMessage):
room = client_room_manager.get_room(client.tmp_room_id)

View File

@ -33,7 +33,7 @@ _text_future_map: Dict[str, asyncio.Future] = {}
def init():
asyncio.ensure_future(_do_init())
asyncio.get_event_loop().create_task(_do_init())
async def _do_init():
@ -100,7 +100,7 @@ def translate(text) -> Awaitable[Optional[str]]:
if future is not None:
return future
# 否则创建一个翻译任务
future = asyncio.get_event_loop().create_future()
future = asyncio.get_running_loop().create_future()
# 查缓存
res = _translate_cache.get(key, None)
@ -168,7 +168,7 @@ class FlowControlTranslateProvider(TranslateProvider):
self._text_queue = asyncio.Queue(max_queue_size)
async def init(self):
asyncio.ensure_future(self._translate_consumer())
asyncio.create_task(self._translate_consumer())
return True
@property
@ -189,7 +189,7 @@ class FlowControlTranslateProvider(TranslateProvider):
while True:
try:
text, future = await self._text_queue.get()
asyncio.ensure_future(self._translate_coroutine(text, future))
asyncio.create_task(self._translate_coroutine(text, future))
# 频率限制
await asyncio.sleep(self._query_interval)
except Exception: # noqa
@ -226,7 +226,7 @@ class TencentTranslateFree(FlowControlTranslateProvider):
return False
if not await self._do_init():
return False
self._reinit_future = asyncio.ensure_future(self._reinit_coroutine())
self._reinit_future = asyncio.create_task(self._reinit_coroutine())
return True
async def _do_init(self):
@ -303,7 +303,7 @@ class TencentTranslateFree(FlowControlTranslateProvider):
while True:
await asyncio.sleep(30)
logger.debug('TencentTranslateFree reinit')
asyncio.ensure_future(self._do_init())
asyncio.create_task(self._do_init())
except asyncio.CancelledError:
pass
@ -515,7 +515,7 @@ class TencentTranslate(FlowControlTranslateProvider):
# 需要手动处理等5分钟
sleep_time = 5 * 60
if sleep_time != 0:
self._cool_down_timer_handle = asyncio.get_event_loop().call_later(
self._cool_down_timer_handle = asyncio.get_running_loop().call_later(
sleep_time, self._on_cool_down_timeout
)
@ -577,7 +577,7 @@ class BaiduTranslate(FlowControlTranslateProvider):
# 账户余额不足需要手动处理等5分钟
sleep_time = 5 * 60
if sleep_time != 0:
self._cool_down_timer_handle = asyncio.get_event_loop().call_later(
self._cool_down_timer_handle = asyncio.get_running_loop().call_later(
sleep_time, self._on_cool_down_timeout
)

View File

@ -9,7 +9,7 @@ VERSION = 'v1.6.2'
def check_update():
asyncio.ensure_future(_do_check_update())
asyncio.get_event_loop().create_task(_do_check_update())
async def _do_check_update():

View File

@ -1,4 +1,5 @@
# -*- coding: utf-8 -*-
import asyncio
from typing import *
import aiohttp
@ -14,7 +15,10 @@ BILIBILI_COMMON_HEADERS = {
http_session: Optional[aiohttp.ClientSession] = None
# ClientSession要在异步函数中创建
async def init():
global http_session
http_session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10))
def init():
# ClientSession要在异步函数中创建
async def do_init():
global http_session
http_session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10))
asyncio.get_event_loop().run_until_complete(do_init())