#pragma once
#include <concepts>
#include <functional>
#include <map>
#include <optional>
#include <type_traits>
#include <unordered_map>
#include <json/json.hpp>
#include "integrations/kafka/consumer.hpp"
#include "kvstore/kvstore.hpp"
#include "query/stream/common.hpp"
#include "query/stream/sources.hpp"
#include "query/typed_value.hpp"
#include "storage/v2/property_value.hpp"
#include "utils/event_counter.hpp"
#include "utils/exceptions.hpp"
#include "utils/rw_lock.hpp"
#include "utils/synchronized.hpp"
class StreamsTest;
namespace query {
struct InterpreterContext;
namespace stream {
class StreamsException : public utils::BasicException {
using BasicException::BasicException;
template <typename T>
struct StreamInfo;
template <>
struct StreamInfo<void> {
using Type = CommonStreamInfo;
template <Stream TStream>
struct StreamInfo<TStream> {
using Type = typename TStream::StreamInfo;
template <typename T>
using StreamInfoType = typename StreamInfo<T>::Type;
template <typename T = void>
struct StreamStatus {
std::string name;
StreamSourceType type;
bool is_running;
StreamInfoType<T> info;
std::optional<std::string> owner;
using TransformationResult = std::vector<std::vector<TypedValue>>;
/// Manages Kafka consumers.
/// This class is responsible for all query supported actions to happen.
class Streams final {
friend StreamsTest;
/// Initializes the streams.
/// @param interpreter_context context to use to run the result of transformations
/// @param directory a directory path to store the persisted streams metadata
Streams(InterpreterContext *interpreter_context, std::filesystem::path directory);
/// Restores the streams from the persisted metadata.
/// The restoration is done in a best effort manner, therefore no exception is thrown on failure, but the error is
/// logged. If a stream was running previously, then after restoration it will be started.
/// This function should only be called when there are no existing streams.
void RestoreStreams();
/// Creates a new import stream.
/// The create implies connecting to the server to get metadata necessary to initialize the stream. This
/// method assures there is no other stream with the same name.
/// @param stream_name the name of the stream which can be used to uniquely identify the stream
/// @param stream_info the necessary informations needed to create the Kafka consumer and transform the messages
/// @throws StreamsException if the stream with the same name exists or if the creation of Kafka consumer fails
template <Stream TStream>
void Create(const std::string &stream_name, typename TStream::StreamInfo info, std::optional<std::string> owner);
/// Deletes an existing stream and all the data that was persisted.
/// @param stream_name name of the stream that needs to be deleted.
/// @throws StreamsException if the stream doesn't exist or if the persisted metadata can't be deleted.
void Drop(const std::string &stream_name);
/// Start consuming from a stream.
/// @param stream_name name of the stream that needs to be started
/// @throws StreamsException if the stream doesn't exist or if the metadata cannot be persisted
/// @throws ConsumerRunningException if the consumer is already running
void Start(const std::string &stream_name);
/// Stop consuming from a stream.
/// @param stream_name name of the stream that needs to be stopped
/// @throws StreamsException if the stream doesn't exist or if the metadata cannot be persisted
/// @throws ConsumerStoppedException if the consumer is already stopped
void Stop(const std::string &stream_name);
/// Start consuming from all streams that are stopped.
/// @throws StreamsException if the metadata cannot be persisted
void StartAll();
/// Stop consuming from all streams that are running.
/// @throws StreamsException if the metadata cannot be persisted
void StopAll();
/// Return current status for all streams.
/// It might happend that the is_running field is out of date if the one of the streams stops during the invocation of
/// this function because of an error.
std::vector<StreamStatus<>> GetStreamInfo() const;
/// Do a dry-run consume from a stream.
/// @param stream_name name of the stream we want to test
/// @param batch_limit number of batches we want to test before stopping
/// @returns A vector of vectors of TypedValue. Each subvector contains two elements, the query string and the
/// nullable parameters map.
/// @throws StreamsException if the stream doesn't exist
/// @throws ConsumerRunningException if the consumer is alredy running
/// @throws ConsumerCheckFailedException if the transformation function throws any std::exception during processing
TransformationResult Check(const std::string &stream_name,
std::optional<std::chrono::milliseconds> timeout = std::nullopt,
std::optional<int64_t> batch_limit = std::nullopt) const;
/// Return the configuration value passed to memgraph.
std::string_view BootstrapServers() const;
template <Stream TStream>
using SynchronizedStreamSource = utils::Synchronized<TStream, utils::WritePrioritizedRWLock>;
template <Stream TStream>
struct StreamData {
std::string transformation_name;
std::optional<std::string> owner;
std::unique_ptr<SynchronizedStreamSource<TStream>> stream_source;
using StreamDataVariant = std::variant<StreamData<KafkaStream>, StreamData<PulsarStream>>;
using StreamsMap = std::unordered_map<std::string, StreamDataVariant>;
using SynchronizedStreamsMap = utils::Synchronized<StreamsMap, utils::WritePrioritizedRWLock>;
template <Stream TStream>
StreamsMap::iterator CreateConsumer(StreamsMap &map, const std::string &stream_name,
typename TStream::StreamInfo stream_info, std::optional<std::string> owner);
template <Stream TStream>
void Persist(StreamStatus<TStream> &&status) {
const std::string stream_name =;
if (!storage_.Put(stream_name, nlohmann::json(std::move(status)).dump())) {
throw StreamsException{"Couldn't persist steam data for stream '{}'", stream_name};
void RegisterProcedures();
void RegisterKafkaProcedures();
void RegisterPulsarProcedures();
InterpreterContext *interpreter_context_;
kvstore::KVStore storage_;
SynchronizedStreamsMap streams_;
} // namespace stream
} // namespace query