Avoid crash in case of loading old stream (#302)

This commit is contained in:
János Benjamin Antal 2021-11-30 15:27:11 +01:00 committed by GitHub
parent aabec99a8e
commit 05d0aee494
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -574,15 +574,22 @@ void Streams::RestoreStreams() {
};
auto stream_json_data = nlohmann::json::parse(stream_data);
const auto stream_type = static_cast<StreamSourceType>(stream_json_data.at("type"));
switch (stream_type) {
case StreamSourceType::KAFKA:
create_consumer(StreamStatus<KafkaStream>{}, std::move(stream_json_data));
break;
case StreamSourceType::PULSAR:
create_consumer(StreamStatus<PulsarStream>{}, std::move(stream_json_data));
break;
if (const auto it = stream_json_data.find(kType); it != stream_json_data.end()) {
const auto stream_type = static_cast<StreamSourceType>(*it);
switch (stream_type) {
case StreamSourceType::KAFKA:
create_consumer(StreamStatus<KafkaStream>{}, std::move(stream_json_data));
break;
case StreamSourceType::PULSAR:
create_consumer(StreamStatus<PulsarStream>{}, std::move(stream_json_data));
break;
}
} else {
spdlog::warn(
"Unable to load stream '{}', because it does not contain the type of the stream. Most probably the stream "
"was saved before Memgraph 2.1. Please recreate the stream manually to make it work. For more information "
"please check https://memgraph.com/docs/memgraph/changelog#v210---nov-22-2021 .",
stream_json_data.value(kStreamName, "<invalid format>"));
}
}
}