diff --git a/src/query/stream/streams.cpp b/src/query/stream/streams.cpp index dc30d9e86..ab26dc9d3 100644 --- a/src/query/stream/streams.cpp +++ b/src/query/stream/streams.cpp @@ -574,15 +574,22 @@ void Streams::RestoreStreams() { }; auto stream_json_data = nlohmann::json::parse(stream_data); - const auto stream_type = static_cast(stream_json_data.at("type")); - - switch (stream_type) { - case StreamSourceType::KAFKA: - create_consumer(StreamStatus{}, std::move(stream_json_data)); - break; - case StreamSourceType::PULSAR: - create_consumer(StreamStatus{}, std::move(stream_json_data)); - break; + if (const auto it = stream_json_data.find(kType); it != stream_json_data.end()) { + const auto stream_type = static_cast(*it); + switch (stream_type) { + case StreamSourceType::KAFKA: + create_consumer(StreamStatus{}, std::move(stream_json_data)); + break; + case StreamSourceType::PULSAR: + create_consumer(StreamStatus{}, std::move(stream_json_data)); + break; + } + } else { + spdlog::warn( + "Unable to load stream '{}', because it does not contain the type of the stream. Most probably the stream " + "was saved before Memgraph 2.1. Please recreate the stream manually to make it work. For more information " + "please check https://memgraph.com/docs/memgraph/changelog#v210---nov-22-2021 .", + stream_json_data.value(kStreamName, "")); } } }