blivedm/sample.py

55 lines
1.2 KiB
Python
Raw Normal View History

2018-05-13 21:57:36 +08:00
# -*- coding: utf-8 -*-
2019-02-20 00:25:14 +08:00
import asyncio
import sys
from ssl import SSLError
from blivedm import BLiveClient
class MyBLiveClient(BLiveClient):
async def _on_get_popularity(self, popularity):
print('当前人气值:', popularity)
async def _on_get_danmaku(self, content, user_name):
print(user_name, '说:', content)
def _on_stop(self, exc):
2019-02-20 00:25:14 +08:00
# 执行self.close然后关闭事件循环
asyncio.ensure_future(
self.close(), loop=self._loop
).add_done_callback(
lambda future: self._loop.stop()
)
def _handle_error(self, exc):
print(exc, file=sys.stderr)
if isinstance(exc, SSLError):
print('SSL验证失败', file=sys.stderr)
return False
2018-05-13 21:57:36 +08:00
def main():
2019-02-20 00:25:14 +08:00
loop = asyncio.get_event_loop()
2019-02-20 00:25:14 +08:00
# 139是黑桐谷歌的直播间
# 如果SSL验证失败就把第二个参数设为False
client = MyBLiveClient(139, True)
client.start()
# 5秒后停止测试用
# loop.call_later(5, client.stop)
# 按Ctrl+C停止
# import signal
# signal.signal(signal.SIGINT, lambda signum, frame: client.stop())
try:
loop.run_forever()
finally:
loop.close()
2018-05-13 21:57:36 +08:00
if __name__ == '__main__':
main()