refactor: refactor operators

This commit is contained in:
acgnhik 2022-07-17 11:17:28 +08:00
parent 1370cecea8
commit fdd93e70a5
4 changed files with 96 additions and 24 deletions

View File

@ -78,31 +78,48 @@ def fix() -> Callable[[FLVStream], FLVStream]:
def update_delta(tag: FlvTag) -> None: def update_delta(tag: FlvTag) -> None:
nonlocal delta nonlocal delta
assert last_tag is not None assert last_tag is not None
delta = last_tag.timestamp + delta - tag.timestamp + calc_interval(tag) assert last_video_tag is not None
assert last_audio_tag is not None
if is_video_tag(tag):
delta = (
last_video_tag.timestamp - tag.timestamp + video_frame_interval
)
elif is_audio_tag(tag):
delta = (
last_audio_tag.timestamp - tag.timestamp + sound_sample_interval
)
if tag.timestamp + delta <= last_tag.timestamp:
if is_video_tag(tag):
delta = (
last_tag.timestamp - tag.timestamp + video_frame_interval
)
elif is_audio_tag(tag):
delta = (
last_tag.timestamp - tag.timestamp + sound_sample_interval
)
def correct_ts(tag: FlvTag) -> FlvTag: def correct_ts(tag: FlvTag) -> FlvTag:
if delta == 0: if delta == 0:
return tag return tag
return tag.evolve(timestamp=tag.timestamp + delta) return tag.evolve(timestamp=tag.timestamp + delta)
def calc_interval(tag: FlvTag) -> int:
if is_audio_tag(tag):
return sound_sample_interval
elif is_video_tag(tag):
return video_frame_interval
else:
logger.warning(f'Unexpected tag type: {tag}')
return min(sound_sample_interval, video_frame_interval)
def is_ts_rebounded(tag: FlvTag) -> bool: def is_ts_rebounded(tag: FlvTag) -> bool:
if is_audio_tag(tag): if is_audio_tag(tag):
if last_audio_tag is None: if last_audio_tag is None:
return False return False
return tag.timestamp < last_audio_tag.timestamp if last_audio_tag.is_aac_header():
return tag.timestamp + delta < last_audio_tag.timestamp
else:
return tag.timestamp + delta <= last_audio_tag.timestamp
elif is_video_tag(tag): elif is_video_tag(tag):
if last_video_tag is None: if last_video_tag is None:
return False return False
return tag.timestamp < last_video_tag.timestamp if last_video_tag.is_avc_header():
return tag.timestamp + delta < last_video_tag.timestamp
else:
return tag.timestamp + delta <= last_video_tag.timestamp
else: else:
return False return False
@ -111,7 +128,7 @@ def fix() -> Callable[[FLVStream], FLVStream]:
if last_tag is None: if last_tag is None:
return False return False
return ( return (
tag.timestamp - last_tag.timestamp tag.timestamp + delta - last_tag.timestamp
> max(sound_sample_interval, video_frame_interval) + tolerance > max(sound_sample_interval, video_frame_interval) + tolerance
) )
@ -133,8 +150,9 @@ def fix() -> Callable[[FLVStream], FLVStream]:
update_delta(tag) update_delta(tag)
logger.warning( logger.warning(
f'Timestamp rebounded, updated delta: {delta}\n' f'Timestamp rebounded, updated delta: {delta}\n'
f'last audio tag: {last_audio_tag}\n' f'last tag: {last_tag}\n'
f'last video tag: {last_video_tag}\n' f'last video tag: {last_video_tag}\n'
f'last audio tag: {last_audio_tag}\n'
f'current tag: {tag}' f'current tag: {tag}'
) )
elif is_ts_incontinuous(tag): elif is_ts_incontinuous(tag):
@ -142,11 +160,13 @@ def fix() -> Callable[[FLVStream], FLVStream]:
logger.warning( logger.warning(
f'Timestamp incontinuous, updated delta: {delta}\n' f'Timestamp incontinuous, updated delta: {delta}\n'
f'last tag: {last_tag}\n' f'last tag: {last_tag}\n'
f'last video tag: {last_video_tag}\n'
f'last audio tag: {last_audio_tag}\n'
f'current tag: {tag}' f'current tag: {tag}'
) )
update_last_tags(tag)
tag = correct_ts(tag) tag = correct_ts(tag)
update_last_tags(tag)
observer.on_next(tag) observer.on_next(tag)
def dispose() -> None: def dispose() -> None:

View File

@ -5,6 +5,7 @@ from reactivex import operators as ops
from ..common import is_avc_end_sequence_tag from ..common import is_avc_end_sequence_tag
from .concat import concat from .concat import concat
from .correct import correct
from .defragment import defragment from .defragment import defragment
from .fix import fix from .fix import fix
from .sort import sort from .sort import sort
@ -23,17 +24,19 @@ def process(
if sort_tags: if sort_tags:
return source.pipe( return source.pipe(
defragment(), defragment(),
split(),
sort(trace=trace), sort(trace=trace),
ops.filter(lambda v: not is_avc_end_sequence_tag(v)), # type: ignore ops.filter(lambda v: not is_avc_end_sequence_tag(v)), # type: ignore
split(), correct(),
fix(), fix(),
concat(), concat(),
) )
else: else:
return source.pipe( return source.pipe(
defragment(), defragment(),
ops.filter(lambda v: not is_avc_end_sequence_tag(v)), # type: ignore
split(), split(),
ops.filter(lambda v: not is_avc_end_sequence_tag(v)), # type: ignore
correct(),
fix(), fix(),
concat(), concat(),
) )

View File

@ -4,8 +4,17 @@ from typing import Callable, List, Optional
from reactivex import Observable, abc from reactivex import Observable, abc
from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable
from ..common import is_avc_end_sequence, is_video_nalu_keyframe from ..common import (
from ..models import FlvHeader, FlvTag find_aac_header_tag,
find_avc_header_tag,
find_metadata_tag,
is_audio_tag,
is_avc_end_sequence,
is_script_tag,
is_video_nalu_keyframe,
is_video_tag,
)
from ..models import AudioTag, FlvHeader, FlvTag, ScriptTag, VideoTag
from .typing import FLVStream, FLVStreamItem from .typing import FLVStream, FLVStreamItem
__all__ = ('sort',) __all__ = ('sort',)
@ -34,7 +43,6 @@ def sort(trace: bool = False) -> Callable[[FLVStream], FLVStream]:
if not gop_tags: if not gop_tags:
return return
gop_tags.sort(key=lambda tag: tag.timestamp)
if trace: if trace:
logger.debug( logger.debug(
'Tags in GOP:\n' 'Tags in GOP:\n'
@ -44,7 +52,40 @@ def sort(trace: bool = False) -> Callable[[FLVStream], FLVStream]:
f'The last tag is {gop_tags[-1]}' f'The last tag is {gop_tags[-1]}'
) )
if len(gop_tags) < 10:
avc_header_tag = find_avc_header_tag(gop_tags)
aac_header_tag = find_aac_header_tag(gop_tags)
if avc_header_tag is not None and aac_header_tag is not None:
if (metadata_tag := find_metadata_tag(gop_tags)) is not None:
observer.on_next(metadata_tag)
observer.on_next(avc_header_tag)
observer.on_next(aac_header_tag)
gop_tags.clear()
return
script_tags: List[ScriptTag] = []
video_tags: List[VideoTag] = []
audio_tags: List[AudioTag] = []
for tag in gop_tags: for tag in gop_tags:
if is_video_tag(tag):
video_tags.append(tag)
elif is_audio_tag(tag):
audio_tags.append(tag)
elif is_script_tag(tag):
script_tags.append(tag)
sorted_tags: List[FlvTag] = []
i = len(audio_tags) - 1
for video_tag in reversed(video_tags):
sorted_tags.insert(0, video_tag)
while i >= 0 and audio_tags[i].timestamp >= video_tag.timestamp:
sorted_tags.insert(1, audio_tags[i])
i -= 1
for tag in script_tags:
observer.on_next(tag)
for tag in sorted_tags:
observer.on_next(tag) observer.on_next(tag)
gop_tags.clear() gop_tags.clear()

View File

@ -4,9 +4,13 @@ from typing import Callable, Optional
from reactivex import Observable, abc from reactivex import Observable, abc
from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable
from ..common import is_audio_sequence_header, is_metadata_tag, is_video_sequence_header from ..common import (
is_audio_sequence_header,
is_metadata_tag,
is_video_sequence_header,
parse_metadata,
)
from ..models import AudioTag, FlvHeader, ScriptTag, VideoTag from ..models import AudioTag, FlvHeader, ScriptTag, VideoTag
from .correct import correct
from .typing import FLVStream, FLVStreamItem from .typing import FLVStream, FLVStreamItem
__all__ = ('split',) __all__ = ('split',)
@ -66,7 +70,11 @@ def split() -> Callable[[FLVStream], FLVStream]:
tag = item tag = item
if is_metadata_tag(tag): if is_metadata_tag(tag):
logger.debug(f'Metadata tag: {tag}') metadata = parse_metadata(tag)
logger.debug(f'Metadata tag: {tag}, metadata: {metadata}')
if last_metadata_tag is not None:
last_metadata_tag = tag
return
last_metadata_tag = tag last_metadata_tag = tag
elif is_audio_sequence_header(tag): elif is_audio_sequence_header(tag):
logger.debug(f'Audio sequence header: {tag}') logger.debug(f'Audio sequence header: {tag}')
@ -106,6 +114,6 @@ def split() -> Callable[[FLVStream], FLVStream]:
return CompositeDisposable(subscription, Disposable(dispose)) return CompositeDisposable(subscription, Disposable(dispose))
return Observable(subscribe).pipe(correct()) return Observable(subscribe)
return _split return _split