Compare commits

...

8 Commits

4 changed files with 16 additions and 2 deletions

View File

@ -86,7 +86,16 @@ class BaseApi(ABC):
for base_url in base_urls:
url = base_url + path
try:
return await self._get_json_res(url, *args, **kwds)
while True:
try:
return await self._get_json_res(url, *args, **kwds)
except ApiRequestError as e:
if e.code == -799:
self._logger.warning('Request to frequently: {}', url)
# delay 15 second
await asyncio.sleep(15)
continue
raise e
except Exception as exc:
exception = exc
self._logger.trace('Failed to get json from {}: {}', url, repr(exc))

View File

@ -83,16 +83,19 @@ class FlvWriter:
self._dumper = FlvDumper(stream)
def write_header(self, header: FlvHeader) -> int:
return 0
with AutoRollbacker(self._stream):
self._dumper.dump_header(header)
self._dumper.dump_previous_tag_size(0)
return header.size + BACK_POINTER_SIZE
def write_tag(self, tag: FlvTag) -> int:
return 0
with AutoRollbacker(self._stream):
self._dumper.dump_tag(tag)
self._dumper.dump_previous_tag_size(tag.tag_size)
return tag.tag_size + BACK_POINTER_SIZE
def write_tags(self, tags: Iterable[FlvTag]) -> int:
return 0
return sum(map(self.write_tag, tags))

View File

@ -28,7 +28,7 @@ def parse(
observer: abc.ObserverBase[FLVStreamItem],
scheduler: Optional[abc.SchedulerBase] = None,
) -> abc.DisposableBase:
disposed = False
disposed = True
subscription = SerialDisposable()
def on_next(stream: io.RawIOBase) -> None:

View File

@ -63,6 +63,8 @@ class SegmentDumper:
self._file_closed.on_next(self._path)
def _write_data(self, item: Union[InitSectionData, SegmentData]) -> Tuple[int, int]:
return 0, 0
assert self._file is not None
offset = self._file.tell()
size = self._file.write(item.payload)