refactor: refactor operators

This commit is contained in:
acgnhik 2022-06-25 14:27:34 +08:00
parent ae6058146d
commit 8d720cecc5
16 changed files with 187 additions and 16 deletions

View File

@ -5,6 +5,7 @@ import logging
from typing import List, Optional, Union from typing import List, Optional, Union
from reactivex import Observable, Subject, abc from reactivex import Observable, Subject, abc
from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable
from ...utils.ffprobe import StreamProfile, ffprobe from ...utils.ffprobe import StreamProfile, ffprobe
from .segment_fetcher import InitSectionData, SegmentData from .segment_fetcher import InitSectionData, SegmentData
@ -39,6 +40,9 @@ class HLSProber:
observer: abc.ObserverBase[Union[InitSectionData, SegmentData]], observer: abc.ObserverBase[Union[InitSectionData, SegmentData]],
scheduler: Optional[abc.SchedulerBase] = None, scheduler: Optional[abc.SchedulerBase] = None,
) -> abc.DisposableBase: ) -> abc.DisposableBase:
disposed = False
subscription = SerialDisposable()
self._reset() self._reset()
def on_next(item: Union[InitSectionData, SegmentData]) -> None: def on_next(item: Union[InitSectionData, SegmentData]) -> None:
@ -59,10 +63,17 @@ class HLSProber:
observer.on_next(item) observer.on_next(item)
return source.subscribe( def dispose() -> None:
nonlocal disposed
disposed = True
self._reset()
subscription.disposable = source.subscribe(
on_next, observer.on_error, observer.on_completed, scheduler=scheduler on_next, observer.on_error, observer.on_completed, scheduler=scheduler
) )
return CompositeDisposable(subscription, Disposable(dispose))
return Observable(subscribe) return Observable(subscribe)
def _do_probe(self) -> None: def _do_probe(self) -> None:

View File

@ -29,6 +29,7 @@ class PlaylistResolver:
) -> abc.DisposableBase: ) -> abc.DisposableBase:
disposed = False disposed = False
subscription = SerialDisposable() subscription = SerialDisposable()
last_seg_uris: OrderedSet[str] = OrderedSet() last_seg_uris: OrderedSet[str] = OrderedSet()
def on_next(playlist: m3u8.M3U8) -> None: def on_next(playlist: m3u8.M3U8) -> None:

View File

@ -55,6 +55,7 @@ class SegmentFetcher:
) -> abc.DisposableBase: ) -> abc.DisposableBase:
disposed = False disposed = False
subscription = SerialDisposable() subscription = SerialDisposable()
init_section: Optional[InitializationSection] = None init_section: Optional[InitializationSection] = None
def on_next(seg: m3u8.Segment) -> None: def on_next(seg: m3u8.Segment) -> None:
@ -76,7 +77,9 @@ class SegmentFetcher:
def dispose() -> None: def dispose() -> None:
nonlocal disposed nonlocal disposed
nonlocal init_section
disposed = True disposed = True
init_section = None
subscription.disposable = source.subscribe( subscription.disposable = source.subscribe(
on_next, observer.on_error, observer.on_completed, scheduler=scheduler on_next, observer.on_error, observer.on_completed, scheduler=scheduler

View File

@ -21,7 +21,8 @@ logging.getLogger(urllib3.__name__).setLevel(logging.WARNING)
class SegmentRemuxer: class SegmentRemuxer:
_MAX_SEGMENT_DATA_CACHE: Final = 3 _SEGMENT_DATA_CACHE: Final = 10
_MAX_SEGMENT_DATA_CACHE: Final = 15
def __init__(self, live: Live) -> None: def __init__(self, live: Live) -> None:
self._live = live self._live = live
@ -47,6 +48,12 @@ class SegmentRemuxer:
segment_data_cache: List[bytes] = [] segment_data_cache: List[bytes] = []
self._stream_remuxer.stop() self._stream_remuxer.stop()
def reset() -> None:
nonlocal init_section_data, segment_data_cache
init_section_data = None
segment_data_cache = []
self._stream_remuxer.stop()
def write(data: bytes) -> int: def write(data: bytes) -> int:
return wait_for( return wait_for(
self._stream_remuxer.input.write, self._stream_remuxer.input.write,
@ -56,6 +63,7 @@ class SegmentRemuxer:
def on_next(data: Union[InitSectionData, SegmentData]) -> None: def on_next(data: Union[InitSectionData, SegmentData]) -> None:
nonlocal init_section_data nonlocal init_section_data
nonlocal segment_data_cache
if isinstance(data, InitSectionData): if isinstance(data, InitSectionData):
init_section_data = data.payload init_section_data = data.payload
@ -86,14 +94,22 @@ class SegmentRemuxer:
except Exception as e: except Exception as e:
logger.warning(f'Failed to write data to stream remuxer: {repr(e)}') logger.warning(f'Failed to write data to stream remuxer: {repr(e)}')
self._stream_remuxer.stop() self._stream_remuxer.stop()
if len(segment_data_cache) >= self._MAX_SEGMENT_DATA_CACHE:
segment_data_cache = segment_data_cache[
-self._MAX_SEGMENT_DATA_CACHE + 1 :
]
else:
if len(segment_data_cache) >= self._SEGMENT_DATA_CACHE:
segment_data_cache = segment_data_cache[
-self._SEGMENT_DATA_CACHE + 1 :
]
segment_data_cache.append(data.payload) segment_data_cache.append(data.payload)
if len(segment_data_cache) > self._MAX_SEGMENT_DATA_CACHE:
segment_data_cache.pop(0)
def dispose() -> None: def dispose() -> None:
nonlocal disposed nonlocal disposed
disposed = True disposed = True
reset()
subscription.disposable = source.subscribe( subscription.disposable = source.subscribe(
on_next, observer.on_error, observer.on_completed, scheduler=scheduler on_next, observer.on_error, observer.on_completed, scheduler=scheduler

View File

@ -1,7 +1,6 @@
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
import time
import logging import logging
from datetime import datetime from datetime import datetime
from typing import Iterator, Optional from typing import Iterator, Optional

View File

@ -328,6 +328,7 @@ class StreamRecorderImpl(
def _dispose(self) -> None: def _dispose(self) -> None:
self._subscription.dispose() self._subscription.dispose()
del self._subscription
self._on_completed() self._on_completed()
def _on_completed(self) -> None: def _on_completed(self) -> None:

View File

@ -22,11 +22,12 @@ class MetadataDumper(SwitchableMixin):
joinpoint_extractor: flv_ops.JoinPointExtractor, joinpoint_extractor: flv_ops.JoinPointExtractor,
) -> None: ) -> None:
super().__init__() super().__init__()
self._dumper = dumper self._dumper = dumper
self._analyser = analyser self._analyser = analyser
self._joinpoint_extractor = joinpoint_extractor self._joinpoint_extractor = joinpoint_extractor
self._reset()
def _reset(self) -> None:
self._last_metadata: Optional[flv_ops.MetaData] = None self._last_metadata: Optional[flv_ops.MetaData] = None
self._last_join_points: Optional[List[flv_ops.JoinPoint]] = None self._last_join_points: Optional[List[flv_ops.JoinPoint]] = None
@ -40,6 +41,7 @@ class MetadataDumper(SwitchableMixin):
self._file_closed_subscription = self._dumper.file_closed.subscribe( self._file_closed_subscription = self._dumper.file_closed.subscribe(
self._dump_metadata self._dump_metadata
) )
self._reset()
logger.debug('Enabled metadata dumper') logger.debug('Enabled metadata dumper')
def _do_disable(self) -> None: def _do_disable(self) -> None:
@ -49,6 +51,7 @@ class MetadataDumper(SwitchableMixin):
self._join_points_subscription.dispose() self._join_points_subscription.dispose()
with suppress(Exception): with suppress(Exception):
self._file_closed_subscription.dispose() self._file_closed_subscription.dispose()
self._reset()
logger.debug('Disabled metadata dumper') logger.debug('Disabled metadata dumper')
def _update_metadata(self, metadata: Optional[flv_ops.MetaData]) -> None: def _update_metadata(self, metadata: Optional[flv_ops.MetaData]) -> None:

View File

@ -237,9 +237,12 @@ class Analyser:
observer: abc.ObserverBase[FLVStreamItem], observer: abc.ObserverBase[FLVStreamItem],
scheduler: Optional[abc.SchedulerBase] = None, scheduler: Optional[abc.SchedulerBase] = None,
) -> abc.DisposableBase: ) -> abc.DisposableBase:
stream_index: int = -1 disposed = False
subscription = SerialDisposable() subscription = SerialDisposable()
self._reset()
stream_index: int = -1
def push_metadata() -> None: def push_metadata() -> None:
try: try:
metadata = self.make_metadata() metadata = self.make_metadata()
@ -270,7 +273,10 @@ class Analyser:
observer.on_error(e) observer.on_error(e)
def dispose() -> None: def dispose() -> None:
nonlocal disposed
disposed = True
push_metadata() push_metadata()
self._reset()
subscription.disposable = source.subscribe( subscription.disposable = source.subscribe(
on_next, on_error, on_completed, scheduler=scheduler on_next, on_error, on_completed, scheduler=scheduler

View File

@ -85,6 +85,9 @@ def concat(
observer: abc.ObserverBase[FLVStreamItem], observer: abc.ObserverBase[FLVStreamItem],
scheduler: Optional[abc.SchedulerBase] = None, scheduler: Optional[abc.SchedulerBase] = None,
) -> abc.DisposableBase: ) -> abc.DisposableBase:
disposed = False
subscription = SerialDisposable()
delta: int = 0 delta: int = 0
action: ACTION = ACTION.NOOP action: ACTION = ACTION.NOOP
last_tags: List[FlvTag] = [] last_tags: List[FlvTag] = []
@ -93,6 +96,22 @@ def concat(
last_audio_sequence_header: Optional[AudioTag] = None last_audio_sequence_header: Optional[AudioTag] = None
last_video_sequence_header: Optional[VideoTag] = None last_video_sequence_header: Optional[VideoTag] = None
def reset() -> None:
nonlocal delta
nonlocal action
nonlocal last_tags
nonlocal gathered_tags
nonlocal last_flv_header
nonlocal last_audio_sequence_header
nonlocal last_video_sequence_header
delta = 0
action = ACTION.NOOP
last_tags = []
gathered_tags = []
last_flv_header = None
last_audio_sequence_header = None
last_video_sequence_header = None
def update_last_tags(tag: FlvTag) -> None: def update_last_tags(tag: FlvTag) -> None:
nonlocal last_audio_sequence_header, last_video_sequence_header nonlocal last_audio_sequence_header, last_video_sequence_header
last_tags.append(tag) last_tags.append(tag)
@ -315,10 +334,17 @@ def concat(
do_concat() do_concat()
observer.on_error(e) observer.on_error(e)
return source.subscribe( def dispose() -> None:
nonlocal disposed
disposed = True
reset()
subscription.disposable = source.subscribe(
on_next, on_error, on_completed, scheduler=scheduler on_next, on_error, on_completed, scheduler=scheduler
) )
return CompositeDisposable(subscription, Disposable(dispose))
return Observable(subscribe) return Observable(subscribe)
return _concat return _concat
@ -340,11 +366,19 @@ class JoinPointExtractor:
observer: abc.ObserverBase[FLVStreamItem], observer: abc.ObserverBase[FLVStreamItem],
scheduler: Optional[abc.SchedulerBase] = None, scheduler: Optional[abc.SchedulerBase] = None,
) -> abc.DisposableBase: ) -> abc.DisposableBase:
stream_index: int = -1 disposed = False
subscription = SerialDisposable() subscription = SerialDisposable()
stream_index: int = -1
join_points: List[JoinPoint] = [] join_points: List[JoinPoint] = []
join_point_tag: Optional[ScriptTag] = None join_point_tag: Optional[ScriptTag] = None
def reset() -> None:
nonlocal stream_index, join_points, join_point_tag
stream_index = -1
join_points = []
join_point_tag = None
def push_join_points() -> None: def push_join_points() -> None:
self._join_points.on_next(join_points.copy()) self._join_points.on_next(join_points.copy())
@ -381,7 +415,10 @@ class JoinPointExtractor:
observer.on_error(e) observer.on_error(e)
def dispose() -> None: def dispose() -> None:
nonlocal disposed
disposed = True
push_join_points() push_join_points()
reset()
subscription.disposable = source.subscribe( subscription.disposable = source.subscribe(
on_next, on_error, on_completed, scheduler=scheduler on_next, on_error, on_completed, scheduler=scheduler
@ -409,4 +446,11 @@ class JoinPointExtractor:
crc32=join_point_data['crc32'], crc32=join_point_data['crc32'],
) )
logger.debug(f'Extracted join point: {join_point}; next tag: {next_tag}') logger.debug(f'Extracted join point: {join_point}; next tag: {next_tag}')
if cksum(next_tag.body) != join_point_data['crc32']:
logger.warning(
f'Timestamp of extracted join point may be incorrect\n'
f'join point data: {join_point_data}\n'
f'join point tag: {join_point_tag}\n'
f'next tag: {next_tag}\n'
)
return join_point return join_point

View File

@ -2,6 +2,7 @@ import logging
from typing import Callable, Optional from typing import Callable, Optional
from reactivex import Observable, abc from reactivex import Observable, abc
from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable
from ..common import is_script_tag, is_sequence_header from ..common import is_script_tag, is_sequence_header
from ..models import FlvHeader, FlvTag from ..models import FlvHeader, FlvTag
@ -20,9 +21,17 @@ def correct() -> Callable[[FLVStream], FLVStream]:
observer: abc.ObserverBase[FLVStreamItem], observer: abc.ObserverBase[FLVStreamItem],
scheduler: Optional[abc.SchedulerBase] = None, scheduler: Optional[abc.SchedulerBase] = None,
) -> abc.DisposableBase: ) -> abc.DisposableBase:
disposed = False
subscription = SerialDisposable()
delta: Optional[int] = None delta: Optional[int] = None
first_data_tag: Optional[FlvTag] = None first_data_tag: Optional[FlvTag] = None
def reset() -> None:
nonlocal delta, first_data_tag
delta = None
first_data_tag = None
def correct_ts(tag: FlvTag, delta: int) -> FlvTag: def correct_ts(tag: FlvTag, delta: int) -> FlvTag:
if delta == 0: if delta == 0:
return tag return tag
@ -72,10 +81,17 @@ def correct() -> Callable[[FLVStream], FLVStream]:
tag = correct_ts(tag, delta) tag = correct_ts(tag, delta)
observer.on_next(tag) observer.on_next(tag)
return source.subscribe( def dispose() -> None:
nonlocal disposed
disposed = True
reset()
subscription.disposable = source.subscribe(
on_next, observer.on_error, observer.on_completed, scheduler=scheduler on_next, observer.on_error, observer.on_completed, scheduler=scheduler
) )
return CompositeDisposable(subscription, Disposable(dispose))
return Observable(subscribe) return Observable(subscribe)
return _correct return _correct

View File

@ -4,6 +4,7 @@ import logging
from typing import Optional from typing import Optional
from reactivex import Observable, abc from reactivex import Observable, abc
from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable
from ..common import ( from ..common import (
is_audio_sequence_header, is_audio_sequence_header,
@ -58,6 +59,9 @@ class Cutter:
observer: abc.ObserverBase[FLVStreamItem], observer: abc.ObserverBase[FLVStreamItem],
scheduler: Optional[abc.SchedulerBase] = None, scheduler: Optional[abc.SchedulerBase] = None,
) -> abc.DisposableBase: ) -> abc.DisposableBase:
disposed = False
subscription = SerialDisposable()
def on_next(item: FLVStreamItem) -> None: def on_next(item: FLVStreamItem) -> None:
if isinstance(item, FlvHeader): if isinstance(item, FlvHeader):
self._reset() self._reset()
@ -71,10 +75,17 @@ class Cutter:
self._triggered = False self._triggered = False
observer.on_next(item) observer.on_next(item)
return source.subscribe( def dispose() -> None:
nonlocal disposed
disposed = True
self._reset()
subscription.disposable = source.subscribe(
on_next, observer.on_error, observer.on_completed, scheduler=scheduler on_next, observer.on_error, observer.on_completed, scheduler=scheduler
) )
return CompositeDisposable(subscription, Disposable(dispose))
return Observable(subscribe) return Observable(subscribe)
def _update_flv_header(self, header: FlvHeader) -> None: def _update_flv_header(self, header: FlvHeader) -> None:

View File

@ -2,6 +2,7 @@ import logging
from typing import Callable, List, Optional from typing import Callable, List, Optional
from reactivex import Observable, abc from reactivex import Observable, abc
from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable
from ..models import FlvHeader from ..models import FlvHeader
from .typing import FLVStream, FLVStreamItem from .typing import FLVStream, FLVStreamItem
@ -19,9 +20,17 @@ def defragment(min_tags: int = 10) -> Callable[[FLVStream], FLVStream]:
observer: abc.ObserverBase[FLVStreamItem], observer: abc.ObserverBase[FLVStreamItem],
scheduler: Optional[abc.SchedulerBase] = None, scheduler: Optional[abc.SchedulerBase] = None,
) -> abc.DisposableBase: ) -> abc.DisposableBase:
disposed = False
subscription = SerialDisposable()
gathering: bool = False gathering: bool = False
gathered_items: List[FLVStreamItem] = [] gathered_items: List[FLVStreamItem] = []
def reset() -> None:
nonlocal gathering, gathered_items
gathering = False
gathered_items = []
def on_next(item: FLVStreamItem) -> None: def on_next(item: FLVStreamItem) -> None:
nonlocal gathering nonlocal gathering
@ -47,10 +56,17 @@ def defragment(min_tags: int = 10) -> Callable[[FLVStream], FLVStream]:
else: else:
observer.on_next(item) observer.on_next(item)
return source.subscribe( def dispose() -> None:
nonlocal disposed
disposed = True
reset()
subscription.disposable = source.subscribe(
on_next, observer.on_error, observer.on_completed, scheduler=scheduler on_next, observer.on_error, observer.on_completed, scheduler=scheduler
) )
return CompositeDisposable(subscription, Disposable(dispose))
return Observable(subscribe) return Observable(subscribe)
return _defragment return _defragment

View File

@ -3,6 +3,7 @@ import math
from typing import Callable, Optional from typing import Callable, Optional
from reactivex import Observable, abc from reactivex import Observable, abc
from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable
from ..common import ( from ..common import (
is_audio_tag, is_audio_tag,
@ -27,6 +28,9 @@ def fix() -> Callable[[FLVStream], FLVStream]:
observer: abc.ObserverBase[FLVStreamItem], observer: abc.ObserverBase[FLVStreamItem],
scheduler: Optional[abc.SchedulerBase] = None, scheduler: Optional[abc.SchedulerBase] = None,
) -> abc.DisposableBase: ) -> abc.DisposableBase:
disposed = False
subscription = SerialDisposable()
delta: int = 0 delta: int = 0
last_tag: Optional[FlvTag] = None last_tag: Optional[FlvTag] = None
last_audio_tag: Optional[AudioTag] = None last_audio_tag: Optional[AudioTag] = None
@ -143,10 +147,17 @@ def fix() -> Callable[[FLVStream], FLVStream]:
tag = correct_ts(tag) tag = correct_ts(tag)
observer.on_next(tag) observer.on_next(tag)
return source.subscribe( def dispose() -> None:
nonlocal disposed
disposed = True
reset()
subscription.disposable = source.subscribe(
on_next, observer.on_error, observer.on_completed, scheduler=scheduler on_next, observer.on_error, observer.on_completed, scheduler=scheduler
) )
return CompositeDisposable(subscription, Disposable(dispose))
return Observable(subscribe) return Observable(subscribe)
return _fix return _fix

View File

@ -4,6 +4,7 @@ import logging
from typing import Optional from typing import Optional
from reactivex import Observable, abc from reactivex import Observable, abc
from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable
from ..common import ( from ..common import (
is_audio_sequence_header, is_audio_sequence_header,
@ -51,6 +52,9 @@ class Limiter:
observer: abc.ObserverBase[FLVStreamItem], observer: abc.ObserverBase[FLVStreamItem],
scheduler: Optional[abc.SchedulerBase] = None, scheduler: Optional[abc.SchedulerBase] = None,
) -> abc.DisposableBase: ) -> abc.DisposableBase:
disposed = False
subscription = SerialDisposable()
def on_next(item: FLVStreamItem) -> None: def on_next(item: FLVStreamItem) -> None:
if isinstance(item, FlvHeader): if isinstance(item, FlvHeader):
self._reset() self._reset()
@ -62,10 +66,17 @@ class Limiter:
self._insert_header_and_tags(observer) self._insert_header_and_tags(observer)
observer.on_next(item) observer.on_next(item)
return source.subscribe( def dispose() -> None:
nonlocal disposed
disposed = True
self._reset()
subscription.disposable = source.subscribe(
on_next, observer.on_error, observer.on_completed, scheduler=scheduler on_next, observer.on_error, observer.on_completed, scheduler=scheduler
) )
return CompositeDisposable(subscription, Disposable(dispose))
return Observable(subscribe) return Observable(subscribe)
def _insert_header_and_tags( def _insert_header_and_tags(

View File

@ -5,6 +5,7 @@ import logging
from typing import List, Optional, cast from typing import List, Optional, cast
from reactivex import Observable, Subject, abc from reactivex import Observable, Subject, abc
from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable
from ...utils.ffprobe import StreamProfile, ffprobe from ...utils.ffprobe import StreamProfile, ffprobe
from ..common import find_aac_header_tag, find_avc_header_tag from ..common import find_aac_header_tag, find_avc_header_tag
@ -38,6 +39,9 @@ class Prober:
observer: abc.ObserverBase[FLVStreamItem], observer: abc.ObserverBase[FLVStreamItem],
scheduler: Optional[abc.SchedulerBase] = None, scheduler: Optional[abc.SchedulerBase] = None,
) -> abc.DisposableBase: ) -> abc.DisposableBase:
disposed = False
subscription = SerialDisposable()
self._reset() self._reset()
def on_next(item: FLVStreamItem) -> None: def on_next(item: FLVStreamItem) -> None:
@ -58,10 +62,17 @@ class Prober:
observer.on_next(item) observer.on_next(item)
return source.subscribe( def dispose() -> None:
nonlocal disposed
disposed = True
self._reset()
subscription.disposable = source.subscribe(
on_next, observer.on_error, observer.on_completed, scheduler=scheduler on_next, observer.on_error, observer.on_completed, scheduler=scheduler
) )
return CompositeDisposable(subscription, Disposable(dispose))
return Observable(subscribe) return Observable(subscribe)
def _do_probe(self) -> None: def _do_probe(self) -> None:

View File

@ -2,6 +2,7 @@ import logging
from typing import Callable, Optional from typing import Callable, Optional
from reactivex import Observable, abc from reactivex import Observable, abc
from reactivex.disposable import CompositeDisposable, Disposable, SerialDisposable
from ..common import is_audio_sequence_header, is_metadata_tag, is_video_sequence_header from ..common import is_audio_sequence_header, is_metadata_tag, is_video_sequence_header
from ..models import AudioTag, FlvHeader, ScriptTag, VideoTag from ..models import AudioTag, FlvHeader, ScriptTag, VideoTag
@ -21,6 +22,9 @@ def split() -> Callable[[FLVStream], FLVStream]:
observer: abc.ObserverBase[FLVStreamItem], observer: abc.ObserverBase[FLVStreamItem],
scheduler: Optional[abc.SchedulerBase] = None, scheduler: Optional[abc.SchedulerBase] = None,
) -> abc.DisposableBase: ) -> abc.DisposableBase:
disposed = False
subscription = SerialDisposable()
changed: bool = False changed: bool = False
last_flv_header: Optional[FlvHeader] = None last_flv_header: Optional[FlvHeader] = None
last_metadata_tag: Optional[ScriptTag] = None last_metadata_tag: Optional[ScriptTag] = None
@ -91,10 +95,17 @@ def split() -> Callable[[FLVStream], FLVStream]:
observer.on_next(tag) observer.on_next(tag)
return source.subscribe( def dispose() -> None:
nonlocal disposed
disposed = True
reset()
subscription.disposable = source.subscribe(
on_next, observer.on_error, observer.on_completed, scheduler=scheduler on_next, observer.on_error, observer.on_completed, scheduler=scheduler
) )
return CompositeDisposable(subscription, Disposable(dispose))
return Observable(subscribe).pipe(correct()) return Observable(subscribe).pipe(correct())
return _split return _split