refactor: refactor metadata injection
This commit is contained in:
parent
f6d1777662
commit
f0fa65f269
@ -20,10 +20,10 @@ class MetadataProvider:
|
||||
self._live = live
|
||||
self._stream_recorder = stream_recorder
|
||||
|
||||
def __call__(self) -> Dict[str, Any]:
|
||||
return self._make_metadata()
|
||||
def __call__(self, original_metadata: Dict[str, Any]) -> Dict[str, Any]:
|
||||
return self._make_metadata(original_metadata)
|
||||
|
||||
def _make_metadata(self) -> Dict[str, Any]:
|
||||
def _make_metadata(self, original_metadata: Dict[str, Any]) -> Dict[str, Any]:
|
||||
tz = timezone(timedelta(hours=8))
|
||||
live_start_time = datetime.fromtimestamp(
|
||||
self._live.room_info.live_start_time, tz
|
||||
|
@ -32,7 +32,7 @@ class MetadataDumper(SwitchableMixin):
|
||||
|
||||
def _do_enable(self) -> None:
|
||||
self._metadata_subscription = self._analyser.metadatas.subscribe(
|
||||
on_next=self._update_metadata, on_error=self._reset_metadata
|
||||
on_next=self._update_metadata
|
||||
)
|
||||
self._join_points_subscription = (
|
||||
self._joinpoint_extractor.join_points.subscribe(self._update_join_points)
|
||||
@ -51,12 +51,9 @@ class MetadataDumper(SwitchableMixin):
|
||||
self._file_closed_subscription.dispose()
|
||||
logger.debug('Disabled metadata dumper')
|
||||
|
||||
def _update_metadata(self, metadata: flv_ops.MetaData) -> None:
|
||||
def _update_metadata(self, metadata: Optional[flv_ops.MetaData]) -> None:
|
||||
self._last_metadata = metadata
|
||||
|
||||
def _reset_metadata(self, exc: Exception) -> None:
|
||||
self._last_metadata = None
|
||||
|
||||
def _update_join_points(self, join_points: List[flv_ops.JoinPoint]) -> None:
|
||||
self._last_join_points = join_points
|
||||
|
||||
|
@ -29,14 +29,23 @@ def inject_metadata(
|
||||
path: str, metadata: Dict[str, Any], *, show_progress: bool = False
|
||||
) -> Observable[InjectingProgress]:
|
||||
filesize = os.path.getsize(path)
|
||||
append_comment_for_joinpoints(metadata)
|
||||
|
||||
root, ext = os.path.splitext(path)
|
||||
temp_path = f'{root}_injecting{ext}'
|
||||
filename = os.path.basename(path)
|
||||
|
||||
def metadata_provider(original_metadata: Dict[str, Any]) -> Dict[str, Any]:
|
||||
if joinpoints := metadata.get('joinpoints'):
|
||||
join_points = map(JoinPoint.from_metadata_value, joinpoints)
|
||||
if comment := original_metadata.get('Comment'):
|
||||
comment += '\n' + make_comment_for_joinpoints(join_points)
|
||||
else:
|
||||
comment = make_comment_for_joinpoints(join_points)
|
||||
metadata['Comment'] = comment
|
||||
return metadata
|
||||
|
||||
return from_file(path).pipe(
|
||||
flv_ops.Injector(lambda: metadata),
|
||||
flv_ops.Injector(metadata_provider),
|
||||
flv_ops.Dumper(lambda: (temp_path, int(datetime.now().timestamp()))),
|
||||
flv_ops.ProgressBar(
|
||||
desc='Injecting',
|
||||
@ -44,17 +53,8 @@ def inject_metadata(
|
||||
total=filesize,
|
||||
disable=not show_progress,
|
||||
),
|
||||
ops.map(lambda i: len(i)),
|
||||
ops.scan(lambda acc, x: acc + x, 0),
|
||||
ops.map(lambda s: InjectingProgress(s, filesize)),
|
||||
ops.map(lambda i: len(i)), # type: ignore
|
||||
ops.scan(lambda acc, x: acc + x, 0), # type: ignore
|
||||
ops.map(lambda s: InjectingProgress(s, filesize)), # type: ignore
|
||||
utils_ops.replace(temp_path, path),
|
||||
)
|
||||
|
||||
|
||||
def append_comment_for_joinpoints(metadata: Dict[str, Any]) -> None:
|
||||
if join_points := metadata.get('joinpoints'):
|
||||
join_points = map(JoinPoint.from_metadata_value, join_points)
|
||||
if 'Comment' in metadata:
|
||||
metadata['Comment'] += '\n\n' + make_comment_for_joinpoints(join_points)
|
||||
else:
|
||||
metadata['Comment'] = make_comment_for_joinpoints(join_points)
|
||||
|
@ -102,7 +102,7 @@ class MetaDataDict:
|
||||
|
||||
class Analyser:
|
||||
def __init__(self) -> None:
|
||||
self._metadatas: Subject[MetaData] = Subject()
|
||||
self._metadatas: Subject[Optional[MetaData]] = Subject()
|
||||
self._reset()
|
||||
|
||||
def _reset(self) -> None:
|
||||
@ -245,7 +245,7 @@ class Analyser:
|
||||
metadata = self.make_metadata()
|
||||
except Exception as e:
|
||||
logger.warning(f'Failed to make metadata: {repr(e)}')
|
||||
self._metadatas.on_error(e)
|
||||
self._metadatas.on_next(None)
|
||||
else:
|
||||
self._metadatas.on_next(metadata)
|
||||
|
||||
|
@ -23,7 +23,9 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Injector:
|
||||
def __init__(self, metadata_provider: Callable[..., Dict[str, Any]]) -> None:
|
||||
def __init__(
|
||||
self, metadata_provider: Callable[[Dict[str, Any]], Dict[str, Any]]
|
||||
) -> None:
|
||||
self._metadata_provider = metadata_provider
|
||||
|
||||
def __call__(self, source: FLVStream) -> FLVStream:
|
||||
@ -67,7 +69,7 @@ class Injector:
|
||||
|
||||
def _inject_metadata(self, tag: ScriptTag) -> ScriptTag:
|
||||
old_metadata = parse_metadata(tag)
|
||||
new_metadata = self._metadata_provider()
|
||||
new_metadata = self._metadata_provider(old_metadata.copy())
|
||||
final_metadata = {
|
||||
**{'duration': 0.0, 'filesize': 0.0},
|
||||
**old_metadata,
|
||||
@ -88,6 +90,6 @@ class Injector:
|
||||
return new_tag
|
||||
|
||||
def _make_metadata_tag(self) -> ScriptTag:
|
||||
metadata = self._metadata_provider()
|
||||
metadata = self._metadata_provider({})
|
||||
metadata = {'duration': 0.0, 'filesize': 0.0, **metadata}
|
||||
return create_metadata_tag(metadata)
|
||||
|
Loading…
Reference in New Issue
Block a user