refactor: add sort operator

This commit is contained in:
acgnhik 2022-07-11 13:05:26 +08:00
parent 133409a81a
commit 3b523ca11a
11 changed files with 178 additions and 17 deletions

View File

@ -6,6 +6,7 @@ from reactivex.scheduler import NewThreadScheduler
from ..bili.live import Live
from ..bili.typing import QualityNumber
from ..flv import operators as flv_ops
from ..utils.mixins import SupportDebugMixin
from .stream_recorder_impl import StreamRecorderImpl
__all__ = ('FLVStreamRecorderImpl',)
@ -14,7 +15,7 @@ __all__ = ('FLVStreamRecorderImpl',)
logger = logging.getLogger(__name__)
class FLVStreamRecorderImpl(StreamRecorderImpl):
class FLVStreamRecorderImpl(StreamRecorderImpl, SupportDebugMixin):
def __init__(
self,
live: Live,
@ -40,6 +41,7 @@ class FLVStreamRecorderImpl(StreamRecorderImpl):
filesize_limit=filesize_limit,
duration_limit=duration_limit,
)
self._init_for_debug(live.room_id)
def _run(self) -> None:
self._subscription = (
@ -52,7 +54,7 @@ class FLVStreamRecorderImpl(StreamRecorderImpl):
self._stream_parser,
self._connection_error_handler,
self._request_exception_handler,
flv_ops.process(),
flv_ops.process(sort_tags=True, trace=self._debug),
self._cutter,
self._limiter,
self._join_point_extractor,

View File

@ -9,7 +9,16 @@ from . import scriptdata
from .avc import extract_resolution
from .io import FlvReader
from .io_protocols import RandomIO
from .models import AudioTag, AVCPacketType, FlvTag, ScriptTag, TagType, VideoTag
from .models import (
AudioTag,
AVCPacketType,
CodecID,
FlvTag,
FrameType,
ScriptTag,
TagType,
VideoTag,
)
from .utils import OffsetRepositor
@ -155,8 +164,31 @@ def is_video_nalu_keyframe(tag: FlvTag) -> TypeGuard[VideoTag]:
return is_video_tag(tag) and tag.is_keyframe() and tag.is_avc_nalu()
def is_avc_end_sequence(tag: FlvTag) -> TypeGuard[VideoTag]:
return is_video_tag(tag) and tag.is_avc_end()
def is_avc_end_sequence_tag(value: Any) -> TypeGuard[VideoTag]:
return isinstance(value, FlvTag) and is_avc_end_sequence(value)
def create_avc_end_sequence_tag(offset: int = 0, timestamp: int = 0) -> VideoTag:
return VideoTag(
offset=offset,
filtered=False,
tag_type=TagType.VIDEO,
data_size=5,
timestamp=timestamp,
stream_id=timestamp,
frame_type=FrameType.KEY_FRAME,
codec_id=CodecID.AVC,
avc_packet_type=AVCPacketType.AVC_END_OF_SEQENCE,
composition_time=0,
)
def parse_scriptdata(script_tag: ScriptTag) -> scriptdata.ScriptData:
assert script_tag.body is not None
assert script_tag.body
return scriptdata.load(script_tag.body)
@ -253,8 +285,8 @@ class Resolution:
)
@classmethod
def from_aac_sequence_header(cls, tag: VideoTag) -> Resolution:
def from_avc_sequence_header(cls, tag: VideoTag) -> Resolution:
assert tag.avc_packet_type == AVCPacketType.AVC_SEQUENCE_HEADER
assert tag.body is not None
assert tag.body
width, height = extract_resolution(tag.body)
return cls(width, height)

View File

@ -213,10 +213,10 @@ class FlvDumper:
else:
raise FlvDataError(f'Unsupported tag type: {tag.tag_type}')
if tag.body is None:
self._stream.seek(tag.tag_end_offset)
else:
if tag.body:
self._writer.write(tag.body)
else:
self._stream.seek(tag.tag_end_offset)
def dump_flv_tag_header(self, tag: FlvTag) -> None:
self._writer.write_ui8((int(tag.filtered) << 5) | tag.tag_type.value)

View File

@ -152,7 +152,7 @@ _T = TypeVar('_T', bound='FlvTag')
@attr.s(auto_attribs=True, slots=True, frozen=True, kw_only=True)
class FlvTag(ABC, FlvTagHeader):
offset: int = attr.ib(validator=[non_negative_integer_validator])
body: Optional[bytes] = attr.ib(default=None, repr=cksum)
body: bytes = attr.ib(default=b'', repr=cksum)
def __len__(self) -> int:
return self.tag_size

View File

@ -11,6 +11,7 @@ from .parse import parse
from .probe import Prober, StreamProfile
from .process import process
from .progress import ProgressBar
from .sort import sort
from .split import split
__all__ = (
@ -33,6 +34,7 @@ __all__ = (
'Prober',
'process',
'ProgressBar',
'sort',
'split',
'StreamProfile',
)

View File

@ -328,7 +328,7 @@ class Analyser:
self._keyframe_timestamps.append(tag.timestamp)
self._keyframe_filepositions.append(self.calc_file_size())
if tag.is_avc_header():
self._resolution = Resolution.from_aac_sequence_header(tag)
self._resolution = Resolution.from_avc_sequence_header(tag)
logger.debug(f'Resolution: {self._resolution}')
else:
pass

View File

@ -201,7 +201,7 @@ def concat(
return tag.evolve(timestamp=tag.timestamp + delta)
def make_join_point_tag(next_tag: FlvTag, seamless: bool) -> ScriptTag:
assert next_tag.body is not None
assert next_tag.body
join_point = JoinPoint(
seamless=seamless,
timestamp=float(next_tag.timestamp),
@ -439,7 +439,7 @@ class JoinPointExtractor:
) -> JoinPoint:
script_data = parse_scriptdata(join_point_tag)
join_point_data = cast(JoinPointData, script_data['value'])
assert next_tag.body is not None, next_tag
assert next_tag.body, next_tag
join_point = JoinPoint(
seamless=join_point_data['seamless'],
timestamp=next_tag.timestamp,

View File

@ -107,10 +107,12 @@ def fix() -> Callable[[FLVStream], FLVStream]:
return False
def is_ts_incontinuous(tag: FlvTag) -> bool:
tolerance = 1
if last_tag is None:
return False
return tag.timestamp - last_tag.timestamp > max(
sound_sample_interval, video_frame_interval
return (
tag.timestamp - last_tag.timestamp
> max(sound_sample_interval, video_frame_interval) + tolerance
)
def on_next(item: FLVStreamItem) -> None:

View File

@ -5,7 +5,9 @@ from typing import Callable, Optional
from reactivex import Observable, abc
from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable
from ..common import create_avc_end_sequence_tag, is_avc_end_sequence
from ..io import FlvReader
from ..models import FlvTag
from .typing import FLVStream, FLVStreamItem
__all__ = ('parse',)
@ -30,6 +32,7 @@ def parse(
subscription = SerialDisposable()
def on_next(stream: io.RawIOBase) -> None:
tag: Optional[FlvTag] = None
try:
try:
reader = FlvReader(
@ -42,6 +45,11 @@ def parse(
tag = reader.read_tag()
observer.on_next(tag)
finally:
if tag is not None and not is_avc_end_sequence(tag):
tag = create_avc_end_sequence_tag(
offset=tag.next_tag_offset, timestamp=tag.timestamp
)
observer.on_next(tag)
stream.close()
except EOFError as e:
if complete_on_eof:

View File

@ -1,9 +1,13 @@
import logging
from typing import Callable
from reactivex import operators as ops
from ..common import is_avc_end_sequence_tag
from .concat import concat
from .defragment import defragment
from .fix import fix
from .sort import sort
from .split import split
from .typing import FLVStream
@ -12,8 +16,26 @@ __all__ = ('process',)
logger = logging.getLogger(__name__)
def process() -> Callable[[FLVStream], FLVStream]:
def process(
sort_tags: bool = False, trace: bool = False
) -> Callable[[FLVStream], FLVStream]:
def _process(source: FLVStream) -> FLVStream:
return source.pipe(defragment(), split(), fix(), concat())
if sort_tags:
return source.pipe(
defragment(),
sort(trace=trace),
ops.filter(lambda v: not is_avc_end_sequence_tag(v)), # type: ignore
split(),
fix(),
concat(),
)
else:
return source.pipe(
defragment(),
ops.filter(lambda v: not is_avc_end_sequence_tag(v)), # type: ignore
split(),
fix(),
concat(),
)
return _process

View File

@ -0,0 +1,93 @@
import logging
from typing import Callable, List, Optional
from reactivex import Observable, abc
from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable
from ..common import is_avc_end_sequence, is_video_nalu_keyframe
from ..models import FlvHeader, FlvTag
from .typing import FLVStream, FLVStreamItem
__all__ = ('sort',)
logger = logging.getLogger(__name__)
def sort(trace: bool = False) -> Callable[[FLVStream], FLVStream]:
"Sort tags in GOP by timestamp to ensure subsequent operators work as expected."
def _sort(source: FLVStream) -> FLVStream:
def subscribe(
observer: abc.ObserverBase[FLVStreamItem],
scheduler: Optional[abc.SchedulerBase] = None,
) -> abc.DisposableBase:
disposed = False
subscription = SerialDisposable()
gop_tags: List[FlvTag] = []
def reset() -> None:
nonlocal gop_tags
gop_tags = []
def push_gop_tags() -> None:
if not gop_tags:
return
gop_tags.sort(key=lambda tag: tag.timestamp)
if trace:
logger.debug(
'Tags in GOP:\n'
f'Number of tags: {len(gop_tags)}\n'
f'Total size of tags: {sum(map(len, gop_tags))}\n'
f'The first tag is {gop_tags[0]}\n'
f'The last tag is {gop_tags[-1]}'
)
for tag in gop_tags:
observer.on_next(tag)
gop_tags.clear()
def on_next(item: FLVStreamItem) -> None:
if isinstance(item, FlvHeader) or is_avc_end_sequence(item):
push_gop_tags()
observer.on_next(item)
return
if is_video_nalu_keyframe(item):
push_gop_tags()
gop_tags.append(item)
else:
gop_tags.append(item)
def on_completed() -> None:
push_gop_tags()
observer.on_completed()
def on_error(exc: Exception) -> None:
push_gop_tags()
observer.on_error(exc)
def dispose() -> None:
nonlocal disposed
disposed = True
if gop_tags:
logger.debug(
'Remaining tags:\n'
f'Number of tags: {len(gop_tags)}\n'
f'Total size of tags: {sum(map(len, gop_tags))}\n'
f'The first tag is {gop_tags[0]}\n'
f'The last tag is {gop_tags[-1]}'
)
reset()
subscription.disposable = source.subscribe(
on_next, on_error, on_completed, scheduler=scheduler
)
return CompositeDisposable(subscription, Disposable(dispose))
return Observable(subscribe)
return _sort