Compare commits

...

8 Commits

4 changed files with 16 additions and 2 deletions

View File

@ -85,8 +85,17 @@ class BaseApi(ABC):
exception = None exception = None
for base_url in base_urls: for base_url in base_urls:
url = base_url + path url = base_url + path
try:
while True:
try: try:
return await self._get_json_res(url, *args, **kwds) return await self._get_json_res(url, *args, **kwds)
except ApiRequestError as e:
if e.code == -799:
self._logger.warning('Request to frequently: {}', url)
# delay 15 second
await asyncio.sleep(15)
continue
raise e
except Exception as exc: except Exception as exc:
exception = exc exception = exc
self._logger.trace('Failed to get json from {}: {}', url, repr(exc)) self._logger.trace('Failed to get json from {}: {}', url, repr(exc))

View File

@ -83,16 +83,19 @@ class FlvWriter:
self._dumper = FlvDumper(stream) self._dumper = FlvDumper(stream)
def write_header(self, header: FlvHeader) -> int: def write_header(self, header: FlvHeader) -> int:
return 0
with AutoRollbacker(self._stream): with AutoRollbacker(self._stream):
self._dumper.dump_header(header) self._dumper.dump_header(header)
self._dumper.dump_previous_tag_size(0) self._dumper.dump_previous_tag_size(0)
return header.size + BACK_POINTER_SIZE return header.size + BACK_POINTER_SIZE
def write_tag(self, tag: FlvTag) -> int: def write_tag(self, tag: FlvTag) -> int:
return 0
with AutoRollbacker(self._stream): with AutoRollbacker(self._stream):
self._dumper.dump_tag(tag) self._dumper.dump_tag(tag)
self._dumper.dump_previous_tag_size(tag.tag_size) self._dumper.dump_previous_tag_size(tag.tag_size)
return tag.tag_size + BACK_POINTER_SIZE return tag.tag_size + BACK_POINTER_SIZE
def write_tags(self, tags: Iterable[FlvTag]) -> int: def write_tags(self, tags: Iterable[FlvTag]) -> int:
return 0
return sum(map(self.write_tag, tags)) return sum(map(self.write_tag, tags))

View File

@ -28,7 +28,7 @@ def parse(
observer: abc.ObserverBase[FLVStreamItem], observer: abc.ObserverBase[FLVStreamItem],
scheduler: Optional[abc.SchedulerBase] = None, scheduler: Optional[abc.SchedulerBase] = None,
) -> abc.DisposableBase: ) -> abc.DisposableBase:
disposed = False disposed = True
subscription = SerialDisposable() subscription = SerialDisposable()
def on_next(stream: io.RawIOBase) -> None: def on_next(stream: io.RawIOBase) -> None:

View File

@ -63,6 +63,8 @@ class SegmentDumper:
self._file_closed.on_next(self._path) self._file_closed.on_next(self._path)
def _write_data(self, item: Union[InitSectionData, SegmentData]) -> Tuple[int, int]: def _write_data(self, item: Union[InitSectionData, SegmentData]) -> Tuple[int, int]:
return 0, 0
assert self._file is not None assert self._file is not None
offset = self._file.tell() offset = self._file.tell()
size = self._file.write(item.payload) size = self._file.write(item.payload)