refactor: refactor correct operator

This commit is contained in:
acgnhik 2022-06-08 18:25:48 +08:00
parent a9d8de964d
commit 8b2444f6cc
2 changed files with 26 additions and 6 deletions
src/blrec/flv/operators

View File

@ -21,6 +21,7 @@ def correct() -> Callable[[FLVStream], FLVStream]:
scheduler: Optional[abc.SchedulerBase] = None,
) -> abc.DisposableBase:
delta: Optional[int] = None
first_data_tag: Optional[FlvTag] = None
def correct_ts(tag: FlvTag, delta: int) -> FlvTag:
if delta == 0:
@ -29,9 +30,11 @@ def correct() -> Callable[[FLVStream], FLVStream]:
def on_next(item: FLVStreamItem) -> None:
nonlocal delta
nonlocal first_data_tag
if isinstance(item, FlvHeader):
delta = None
first_data_tag = None
observer.on_next(item)
return
@ -45,14 +48,28 @@ def correct() -> Callable[[FLVStream], FLVStream]:
if delta is None:
if is_sequence_header(tag):
tag = correct_ts(tag, -tag.timestamp)
observer.on_next(tag)
else:
logger.debug(f'The first data tag: {tag}')
delta = -tag.timestamp
if first_data_tag is None:
first_data_tag = tag
logger.debug(f'The first data tag: {first_data_tag}')
else:
second_data_tag = tag
logger.debug(f'The second data tag: {second_data_tag}')
if second_data_tag.timestamp >= first_data_tag.timestamp:
delta = -first_data_tag.timestamp
logger.debug(f'Timestamp delta: {delta}')
tag = correct_ts(tag, delta)
observer.on_next(correct_ts(first_data_tag, delta))
observer.on_next(correct_ts(second_data_tag, delta))
else:
tag = correct_ts(tag, delta)
delta = -second_data_tag.timestamp
logger.debug(f'Timestamp delta: {delta}')
observer.on_next(correct_ts(second_data_tag, delta))
observer.on_next(correct_ts(first_data_tag, delta))
first_data_tag = None
return
tag = correct_ts(tag, delta)
observer.on_next(tag)
return source.subscribe(

View File

@ -90,6 +90,9 @@ class Dumper:
self._timestamp_updates.on_next(0)
else:
if self._flv_writer is not None:
# XXX: negative timestamp will cause
# `struct.error: ubyte format requires 0 <= number <= 255`
assert item.timestamp >= 0, item
size = self._flv_writer.write_tag(item)
self._size_updates.on_next(size)
self._timestamp_updates.on_next(item.timestamp)