diff --git a/src/blrec/flv/operators/fix.py b/src/blrec/flv/operators/fix.py index 807d246..e2b0911 100644 --- a/src/blrec/flv/operators/fix.py +++ b/src/blrec/flv/operators/fix.py @@ -78,31 +78,48 @@ def fix() -> Callable[[FLVStream], FLVStream]: def update_delta(tag: FlvTag) -> None: nonlocal delta assert last_tag is not None - delta = last_tag.timestamp + delta - tag.timestamp + calc_interval(tag) + assert last_video_tag is not None + assert last_audio_tag is not None + + if is_video_tag(tag): + delta = ( + last_video_tag.timestamp - tag.timestamp + video_frame_interval + ) + elif is_audio_tag(tag): + delta = ( + last_audio_tag.timestamp - tag.timestamp + sound_sample_interval + ) + + if tag.timestamp + delta <= last_tag.timestamp: + if is_video_tag(tag): + delta = ( + last_tag.timestamp - tag.timestamp + video_frame_interval + ) + elif is_audio_tag(tag): + delta = ( + last_tag.timestamp - tag.timestamp + sound_sample_interval + ) def correct_ts(tag: FlvTag) -> FlvTag: if delta == 0: return tag return tag.evolve(timestamp=tag.timestamp + delta) - def calc_interval(tag: FlvTag) -> int: - if is_audio_tag(tag): - return sound_sample_interval - elif is_video_tag(tag): - return video_frame_interval - else: - logger.warning(f'Unexpected tag type: {tag}') - return min(sound_sample_interval, video_frame_interval) - def is_ts_rebounded(tag: FlvTag) -> bool: if is_audio_tag(tag): if last_audio_tag is None: return False - return tag.timestamp < last_audio_tag.timestamp + if last_audio_tag.is_aac_header(): + return tag.timestamp + delta < last_audio_tag.timestamp + else: + return tag.timestamp + delta <= last_audio_tag.timestamp elif is_video_tag(tag): if last_video_tag is None: return False - return tag.timestamp < last_video_tag.timestamp + if last_video_tag.is_avc_header(): + return tag.timestamp + delta < last_video_tag.timestamp + else: + return tag.timestamp + delta <= last_video_tag.timestamp else: return False @@ -111,7 +128,7 @@ def fix() -> Callable[[FLVStream], FLVStream]: if last_tag is None: return False return ( - tag.timestamp - last_tag.timestamp + tag.timestamp + delta - last_tag.timestamp > max(sound_sample_interval, video_frame_interval) + tolerance ) @@ -133,8 +150,9 @@ def fix() -> Callable[[FLVStream], FLVStream]: update_delta(tag) logger.warning( f'Timestamp rebounded, updated delta: {delta}\n' - f'last audio tag: {last_audio_tag}\n' + f'last tag: {last_tag}\n' f'last video tag: {last_video_tag}\n' + f'last audio tag: {last_audio_tag}\n' f'current tag: {tag}' ) elif is_ts_incontinuous(tag): @@ -142,11 +160,13 @@ def fix() -> Callable[[FLVStream], FLVStream]: logger.warning( f'Timestamp incontinuous, updated delta: {delta}\n' f'last tag: {last_tag}\n' + f'last video tag: {last_video_tag}\n' + f'last audio tag: {last_audio_tag}\n' f'current tag: {tag}' ) - update_last_tags(tag) tag = correct_ts(tag) + update_last_tags(tag) observer.on_next(tag) def dispose() -> None: diff --git a/src/blrec/flv/operators/process.py b/src/blrec/flv/operators/process.py index e878256..da88308 100644 --- a/src/blrec/flv/operators/process.py +++ b/src/blrec/flv/operators/process.py @@ -5,6 +5,7 @@ from reactivex import operators as ops from ..common import is_avc_end_sequence_tag from .concat import concat +from .correct import correct from .defragment import defragment from .fix import fix from .sort import sort @@ -23,17 +24,19 @@ def process( if sort_tags: return source.pipe( defragment(), + split(), sort(trace=trace), ops.filter(lambda v: not is_avc_end_sequence_tag(v)), # type: ignore - split(), + correct(), fix(), concat(), ) else: return source.pipe( defragment(), - ops.filter(lambda v: not is_avc_end_sequence_tag(v)), # type: ignore split(), + ops.filter(lambda v: not is_avc_end_sequence_tag(v)), # type: ignore + correct(), fix(), concat(), ) diff --git a/src/blrec/flv/operators/sort.py b/src/blrec/flv/operators/sort.py index 547c431..e3c8676 100644 --- a/src/blrec/flv/operators/sort.py +++ b/src/blrec/flv/operators/sort.py @@ -4,8 +4,17 @@ from typing import Callable, List, Optional from reactivex import Observable, abc from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable -from ..common import is_avc_end_sequence, is_video_nalu_keyframe -from ..models import FlvHeader, FlvTag +from ..common import ( + find_aac_header_tag, + find_avc_header_tag, + find_metadata_tag, + is_audio_tag, + is_avc_end_sequence, + is_script_tag, + is_video_nalu_keyframe, + is_video_tag, +) +from ..models import AudioTag, FlvHeader, FlvTag, ScriptTag, VideoTag from .typing import FLVStream, FLVStreamItem __all__ = ('sort',) @@ -34,7 +43,6 @@ def sort(trace: bool = False) -> Callable[[FLVStream], FLVStream]: if not gop_tags: return - gop_tags.sort(key=lambda tag: tag.timestamp) if trace: logger.debug( 'Tags in GOP:\n' @@ -44,7 +52,40 @@ def sort(trace: bool = False) -> Callable[[FLVStream], FLVStream]: f'The last tag is {gop_tags[-1]}' ) + if len(gop_tags) < 10: + avc_header_tag = find_avc_header_tag(gop_tags) + aac_header_tag = find_aac_header_tag(gop_tags) + if avc_header_tag is not None and aac_header_tag is not None: + if (metadata_tag := find_metadata_tag(gop_tags)) is not None: + observer.on_next(metadata_tag) + observer.on_next(avc_header_tag) + observer.on_next(aac_header_tag) + gop_tags.clear() + return + + script_tags: List[ScriptTag] = [] + video_tags: List[VideoTag] = [] + audio_tags: List[AudioTag] = [] for tag in gop_tags: + if is_video_tag(tag): + video_tags.append(tag) + elif is_audio_tag(tag): + audio_tags.append(tag) + elif is_script_tag(tag): + script_tags.append(tag) + + sorted_tags: List[FlvTag] = [] + i = len(audio_tags) - 1 + for video_tag in reversed(video_tags): + sorted_tags.insert(0, video_tag) + while i >= 0 and audio_tags[i].timestamp >= video_tag.timestamp: + sorted_tags.insert(1, audio_tags[i]) + i -= 1 + + for tag in script_tags: + observer.on_next(tag) + + for tag in sorted_tags: observer.on_next(tag) gop_tags.clear() diff --git a/src/blrec/flv/operators/split.py b/src/blrec/flv/operators/split.py index 4fa89ce..7994181 100644 --- a/src/blrec/flv/operators/split.py +++ b/src/blrec/flv/operators/split.py @@ -4,9 +4,13 @@ from typing import Callable, Optional from reactivex import Observable, abc from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable -from ..common import is_audio_sequence_header, is_metadata_tag, is_video_sequence_header +from ..common import ( + is_audio_sequence_header, + is_metadata_tag, + is_video_sequence_header, + parse_metadata, +) from ..models import AudioTag, FlvHeader, ScriptTag, VideoTag -from .correct import correct from .typing import FLVStream, FLVStreamItem __all__ = ('split',) @@ -66,7 +70,11 @@ def split() -> Callable[[FLVStream], FLVStream]: tag = item if is_metadata_tag(tag): - logger.debug(f'Metadata tag: {tag}') + metadata = parse_metadata(tag) + logger.debug(f'Metadata tag: {tag}, metadata: {metadata}') + if last_metadata_tag is not None: + last_metadata_tag = tag + return last_metadata_tag = tag elif is_audio_sequence_header(tag): logger.debug(f'Audio sequence header: {tag}') @@ -106,6 +114,6 @@ def split() -> Callable[[FLVStream], FLVStream]: return CompositeDisposable(subscription, Disposable(dispose)) - return Observable(subscribe).pipe(correct()) + return Observable(subscribe) return _split