From 12f4e0068a8c92f512510a0398ab17a4c0e319f3 Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Tue, 16 Nov 2021 16:12:38 +0100 Subject: [PATCH] Small polishing and fixes --- include/mg_procedure.h | 3 + include/mgp.py | 6 + libs/pulsar.patch | 42 +- src/query/procedure/mg_procedure_impl.cpp | 15 +- src/query/stream/streams.cpp | 34 +- src/query/streams.cpp | 466 --------------- tests/e2e/streams/kafka_streams_tests.py | 10 +- tests/e2e/streams/streams_tests.py | 552 ------------------ .../transformations/kafka_transform.py | 2 +- 9 files changed, 60 insertions(+), 1070 deletions(-) delete mode 100644 src/query/streams.cpp delete mode 100755 tests/e2e/streams/streams_tests.py diff --git a/include/mg_procedure.h b/include/mg_procedure.h index ce20d9b8d..bf47d9477 100644 --- a/include/mg_procedure.h +++ b/include/mg_procedure.h @@ -1470,6 +1470,9 @@ enum mgp_error mgp_message_key_size(struct mgp_message *message, size_t *result) enum mgp_error mgp_message_timestamp(struct mgp_message *message, int64_t *result); /// Get the message offset from a message. +/// Supported stream sources: +/// - Kafka +/// Return MGP_ERROR_INVALID_ARGUMENT if the message is from an unsupported stream source. enum mgp_error mgp_message_offset(struct mgp_message *message, int64_t *result); /// Get the number of messages contained in the mgp_messages list diff --git a/include/mgp.py b/include/mgp.py index 81913ac39..ec2262644 100644 --- a/include/mgp.py +++ b/include/mgp.py @@ -1340,6 +1340,12 @@ class Message: return self._message.timestamp() def offset(self) -> int: + """ + Supported stream sources: + - Kafka + + Raise InvalidArgumentError if the message is from an unsupported stream source. + """ if not self.is_valid(): raise InvalidMessageError() return self._message.offset() diff --git a/libs/pulsar.patch b/libs/pulsar.patch index 870e54375..95111d7f7 100644 --- a/libs/pulsar.patch +++ b/libs/pulsar.patch @@ -60,7 +60,7 @@ index 508e4f4..87c5f2a 100644 decompressed.bytesWritten(uncompressedSize); decoded = decompressed; diff --git a/pulsar-client-cpp/lib/lz4/lz4.c b/pulsar-client-cpp/lib/lz4/lz4.c -index 08cf6b5..d74c287 100644 +index 08cf6b5..07d3e01 100644 --- a/pulsar-client-cpp/lib/lz4/lz4.c +++ b/pulsar-client-cpp/lib/lz4/lz4.c @@ -45,7 +45,7 @@ @@ -174,7 +174,7 @@ index 08cf6b5..d74c287 100644 #define KB *(1 <<10) #define MB *(1 <<20) -@@ -239,7 +239,7 @@ static const int LZ4_minLength = (MFLIMIT+1); +@@ -239,15 +239,15 @@ static const int LZ4_minLength = (MFLIMIT+1); /************************************** * Common Utils **************************************/ @@ -183,9 +183,10 @@ index 08cf6b5..d74c287 100644 /************************************** -@@ -247,7 +247,7 @@ static const int LZ4_minLength = (MFLIMIT+1); + * Common functions **************************************/ - static unsigned LZ4_NbCommonBytes (register size_t val) +-static unsigned LZ4_NbCommonBytes (register size_t val) ++static unsigned PULSAR_LZ4_NbCommonBytes (register size_t val) { - if (LZ4_isLittleEndian()) + if (PULSAR_LZ4_isLittleEndian()) @@ -206,7 +207,8 @@ index 08cf6b5..d74c287 100644 - size_t diff = LZ4_read_ARCH(pMatch) ^ LZ4_read_ARCH(pIn); + size_t diff = PULSAR_LZ4_read_ARCH(pMatch) ^ PULSAR_LZ4_read_ARCH(pIn); if (!diff) { pIn+=STEPSIZE; pMatch+=STEPSIZE; continue; } - pIn += LZ4_NbCommonBytes(diff); +- pIn += LZ4_NbCommonBytes(diff); ++ pIn += PULSAR_LZ4_NbCommonBytes(diff); return (unsigned)(pIn - pStart); } @@ -217,11 +219,13 @@ index 08cf6b5..d74c287 100644 if ((pIn compression run slower on incompressible data */ ++static const int PULSAR_LZ4_64Klimit = ((64 KB) + (MFLIMIT-1)); +static const U32 PULSAR_LZ4_skipTrigger = 6; /* Increase this value ==> compression run slower on incompressible data */ @@ -331,11 +335,13 @@ index 08cf6b5..d74c287 100644 const BYTE* ip = (const BYTE*) source; const BYTE* base; -@@ -483,11 +483,11 @@ FORCE_INLINE int LZ4_compress_generic( +@@ -482,12 +482,12 @@ FORCE_INLINE int LZ4_compress_generic( + lowLimit = (const BYTE*)source; break; } - if ((tableType == byU16) && (inputSize>=LZ4_64Klimit)) return 0; /* Size too large (not within 64K limit) */ +- if ((tableType == byU16) && (inputSize>=LZ4_64Klimit)) return 0; /* Size too large (not within 64K limit) */ - if (inputSize=PULSAR_LZ4_64Klimit)) return 0; /* Size too large (not within 64K limit) */ + if (inputSize= LZ4_compressBound(inputSize)) + if (maxOutputSize >= PULSAR_LZ4_compressBound(inputSize)) { - if (inputSize < LZ4_64Klimit) +- if (inputSize < LZ4_64Klimit) - return LZ4_compress_generic(state, source, dest, inputSize, 0, notLimited, byU16, noDict, noDictIssue, acceleration); ++ if (inputSize < PULSAR_LZ4_64Klimit) + return PULSAR_LZ4_compress_generic(state, source, dest, inputSize, 0, notLimited, byU16, noDict, noDictIssue, acceleration); else - return LZ4_compress_generic(state, source, dest, inputSize, 0, notLimited, LZ4_64bits() ? byU32 : byPtr, noDict, noDictIssue, acceleration); @@ -476,8 +483,9 @@ index 08cf6b5..d74c287 100644 } else { - if (inputSize < LZ4_64Klimit) +- if (inputSize < LZ4_64Klimit) - return LZ4_compress_generic(state, source, dest, inputSize, maxOutputSize, limitedOutput, byU16, noDict, noDictIssue, acceleration); ++ if (inputSize < PULSAR_LZ4_64Klimit) + return PULSAR_LZ4_compress_generic(state, source, dest, inputSize, maxOutputSize, limitedOutput, byU16, noDict, noDictIssue, acceleration); else - return LZ4_compress_generic(state, source, dest, inputSize, maxOutputSize, limitedOutput, LZ4_64bits() ? byU32 : byPtr, noDict, noDictIssue, acceleration); @@ -526,8 +534,9 @@ index 08cf6b5..d74c287 100644 - LZ4_resetStream(&ctx); + PULSAR_LZ4_resetStream(&ctx); - if (inputSize < LZ4_64Klimit) +- if (inputSize < LZ4_64Klimit) - return LZ4_compress_generic(&ctx, source, dest, inputSize, maxOutputSize, limitedOutput, byU16, noDict, noDictIssue, acceleration); ++ if (inputSize < PULSAR_LZ4_64Klimit) + return PULSAR_LZ4_compress_generic(&ctx, source, dest, inputSize, maxOutputSize, limitedOutput, byU16, noDict, noDictIssue, acceleration); else - return LZ4_compress_generic(&ctx, source, dest, inputSize, maxOutputSize, limitedOutput, LZ4_64bits() ? byU32 : byPtr, noDict, noDictIssue, acceleration); @@ -544,11 +553,13 @@ index 08cf6b5..d74c287 100644 void* const ctx, const char* const src, char* const dst, -@@ -748,12 +748,12 @@ static int LZ4_compress_destSize_generic( +@@ -747,13 +747,13 @@ static int LZ4_compress_destSize_generic( + /* Init conditions */ if (targetDstSize < 1) return 0; /* Impossible to store anything */ if ((U32)*srcSizePtr > (U32)LZ4_MAX_INPUT_SIZE) return 0; /* Unsupported input size, too large (or negative) */ - if ((tableType == byU16) && (*srcSizePtr>=LZ4_64Klimit)) return 0; /* Size too large (not within 64K limit) */ +- if ((tableType == byU16) && (*srcSizePtr>=LZ4_64Klimit)) return 0; /* Size too large (not within 64K limit) */ - if (*srcSizePtr=PULSAR_LZ4_64Klimit)) return 0; /* Size too large (not within 64K limit) */ + if (*srcSizePtr(T &&msg) -> int64_t { - using MessageType = std::decay_t; - if constexpr (std::same_as) { - return msg->Offset(); - } else { - throw std::invalid_argument("Invalid source type"); - } - }, - message->msg); + return std::visit(utils::Overloaded{[](const mgp_message::KafkaMessage &msg) { return msg->Offset(); }, + [](const auto &msg) -> int64_t { + throw InvalidMessageFunction(MessageToStreamSourceType(msg), "offset"); + }}, + message->msg); }, result); } diff --git a/src/query/stream/streams.cpp b/src/query/stream/streams.cpp index a4b769b0d..223fdc54f 100644 --- a/src/query/stream/streams.cpp +++ b/src/query/stream/streams.cpp @@ -31,6 +31,7 @@ #include "utils/memory.hpp" #include "utils/on_scope_exit.hpp" #include "utils/pmr/string.hpp" +#include "utils/variant_helpers.hpp" namespace EventCounter { extern const Event MessagesConsumed; @@ -156,15 +157,6 @@ void from_json(const nlohmann::json &data, StreamStatus &status) { from_json(data, status.info); } -namespace { -template -struct Overloaded : Ts... { - using Ts::operator()...; -}; -template -Overloaded(Ts...) -> Overloaded; -} // namespace - Streams::Streams(InterpreterContext *interpreter_context, std::filesystem::path directory) : interpreter_context_(interpreter_context), storage_(std::move(directory)) { constexpr std::string_view proc_name = "kafka_set_stream_offset"; @@ -176,18 +168,18 @@ Streams::Streams(InterpreterContext *interpreter_context, std::filesystem::path const auto offset = procedure::Call(mgp_value_get_int, arg_offset); auto lock_ptr = streams_.Lock(); auto it = GetStream(*lock_ptr, std::string(stream_name)); - std::visit(Overloaded{[&](StreamData &kafka_stream) { - auto stream_source_ptr = kafka_stream.stream_source->Lock(); - const auto error = stream_source_ptr->SetStreamOffset(offset); - if (error.HasError()) { - MG_ASSERT( - mgp_result_set_error_msg(result, error.GetError().c_str()) == MGP_ERROR_NO_ERROR, - "Unable to set procedure error message of procedure: {}", proc_name); - } - }, - [proc_name](auto && /*other*/) { - throw QueryRuntimeException("'{}' can be only used for Kafka stream sources", proc_name); - }}, + std::visit(utils::Overloaded{ + [&](StreamData &kafka_stream) { + auto stream_source_ptr = kafka_stream.stream_source->Lock(); + const auto error = stream_source_ptr->SetStreamOffset(offset); + if (error.HasError()) { + MG_ASSERT(mgp_result_set_error_msg(result, error.GetError().c_str()) == MGP_ERROR_NO_ERROR, + "Unable to set procedure error message of procedure: {}", proc_name); + } + }, + [proc_name](auto && /*other*/) { + throw QueryRuntimeException("'{}' can be only used for Kafka stream sources", proc_name); + }}, it->second); }; diff --git a/src/query/streams.cpp b/src/query/streams.cpp deleted file mode 100644 index c8220e177..000000000 --- a/src/query/streams.cpp +++ /dev/null @@ -1,466 +0,0 @@ -// Copyright 2021 Memgraph Ltd. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source -// License, and you may not use this file except in compliance with the Business Source License. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -#include "query/streams.hpp" - -#include -#include -#include - -#include -#include -#include "query/db_accessor.hpp" -#include "query/discard_value_stream.hpp" -#include "query/interpreter.hpp" -#include "query/procedure//mg_procedure_helpers.hpp" -#include "query/procedure/mg_procedure_impl.hpp" -#include "query/procedure/module.hpp" -#include "query/typed_value.hpp" -#include "utils/event_counter.hpp" -#include "utils/memory.hpp" -#include "utils/on_scope_exit.hpp" -#include "utils/pmr/string.hpp" - -namespace EventCounter { -extern const Event MessagesConsumed; -} // namespace EventCounter - -namespace query { - -using Consumer = integrations::kafka::Consumer; -using ConsumerInfo = integrations::kafka::ConsumerInfo; -using Message = integrations::kafka::Message; -namespace { -constexpr auto kExpectedTransformationResultSize = 2; -const utils::pmr::string query_param_name{"query", utils::NewDeleteResource()}; -const utils::pmr::string params_param_name{"parameters", utils::NewDeleteResource()}; -const std::map empty_parameters{}; - -auto GetStream(auto &map, const std::string &stream_name) { - if (auto it = map.find(stream_name); it != map.end()) { - return it; - } - throw StreamsException("Couldn't find stream '{}'", stream_name); -} - -void CallCustomTransformation(const std::string &transformation_name, const std::vector &messages, - mgp_result &result, storage::Storage::Accessor &storage_accessor, - utils::MemoryResource &memory_resource, const std::string &stream_name) { - DbAccessor db_accessor{&storage_accessor}; - { - auto maybe_transformation = - procedure::FindTransformation(procedure::gModuleRegistry, transformation_name, utils::NewDeleteResource()); - - if (!maybe_transformation) { - throw StreamsException("Couldn't find transformation {} for stream '{}'", transformation_name, stream_name); - }; - const auto &trans = *maybe_transformation->second; - mgp_messages mgp_messages{mgp_messages::storage_type{&memory_resource}}; - std::transform(messages.begin(), messages.end(), std::back_inserter(mgp_messages.messages), - [](const integrations::kafka::Message &message) { return mgp_message{&message}; }); - mgp_graph graph{&db_accessor, storage::View::OLD, nullptr}; - mgp_memory memory{&memory_resource}; - result.rows.clear(); - result.error_msg.reset(); - result.signature = &trans.results; - - MG_ASSERT(result.signature->size() == kExpectedTransformationResultSize); - MG_ASSERT(result.signature->contains(query_param_name)); - MG_ASSERT(result.signature->contains(params_param_name)); - - spdlog::trace("Calling transformation in stream '{}'", stream_name); - trans.cb(&mgp_messages, &graph, &result, &memory); - } - if (result.error_msg.has_value()) { - throw StreamsException(result.error_msg->c_str()); - } -} - -std::pair ExtractTransformationResult( - utils::pmr::map &&values, const std::string_view transformation_name, - const std::string_view stream_name) { - if (values.size() != kExpectedTransformationResultSize) { - throw StreamsException( - "Transformation '{}' in stream '{}' did not yield all fields (query, parameters) as required.", - transformation_name, stream_name); - } - - auto get_value = [&](const utils::pmr::string &field_name) mutable -> TypedValue & { - auto it = values.find(field_name); - if (it == values.end()) { - throw StreamsException{"Transformation '{}' in stream '{}' did not yield a record with '{}' field.", - transformation_name, stream_name, field_name}; - }; - return it->second; - }; - - auto &query_value = get_value(query_param_name); - MG_ASSERT(query_value.IsString()); - auto ¶ms_value = get_value(params_param_name); - MG_ASSERT(params_value.IsNull() || params_value.IsMap()); - return {std::move(query_value), std::move(params_value)}; -} -} // namespace - -// nlohmann::json doesn't support string_view access yet -const std::string kStreamName{"name"}; -const std::string kTopicsKey{"topics"}; -const std::string kConsumerGroupKey{"consumer_group"}; -const std::string kBatchIntervalKey{"batch_interval"}; -const std::string kBatchSizeKey{"batch_size"}; -const std::string kIsRunningKey{"is_running"}; -const std::string kTransformationName{"transformation_name"}; -const std::string kOwner{"owner"}; -const std::string kBoostrapServers{"bootstrap_servers"}; - -void to_json(nlohmann::json &data, StreamStatus &&status) { - auto &info = status.info; - data[kStreamName] = std::move(status.name); - data[kTopicsKey] = std::move(info.topics); - data[kConsumerGroupKey] = info.consumer_group; - - if (info.batch_interval) { - data[kBatchIntervalKey] = info.batch_interval->count(); - } else { - data[kBatchIntervalKey] = nullptr; - } - - if (info.batch_size) { - data[kBatchSizeKey] = *info.batch_size; - } else { - data[kBatchSizeKey] = nullptr; - } - - data[kIsRunningKey] = status.is_running; - data[kTransformationName] = status.info.transformation_name; - - if (info.owner.has_value()) { - data[kOwner] = std::move(*info.owner); - } else { - data[kOwner] = nullptr; - } - - data[kBoostrapServers] = std::move(info.bootstrap_servers); -} - -void from_json(const nlohmann::json &data, StreamStatus &status) { - auto &info = status.info; - data.at(kStreamName).get_to(status.name); - data.at(kTopicsKey).get_to(info.topics); - data.at(kConsumerGroupKey).get_to(info.consumer_group); - - if (const auto batch_interval = data.at(kBatchIntervalKey); !batch_interval.is_null()) { - using BatchInterval = decltype(info.batch_interval)::value_type; - info.batch_interval = BatchInterval{batch_interval.get()}; - } else { - info.batch_interval = {}; - } - - if (const auto batch_size = data.at(kBatchSizeKey); !batch_size.is_null()) { - info.batch_size = batch_size.get(); - } else { - info.batch_size = {}; - } - - data.at(kIsRunningKey).get_to(status.is_running); - data.at(kTransformationName).get_to(status.info.transformation_name); - - if (const auto &owner = data.at(kOwner); !owner.is_null()) { - info.owner = owner.get(); - } else { - info.owner = {}; - } - - info.owner = data.value(kBoostrapServers, ""); -} - -Streams::Streams(InterpreterContext *interpreter_context, std::string bootstrap_servers, - std::filesystem::path directory) - : interpreter_context_(interpreter_context), - bootstrap_servers_(std::move(bootstrap_servers)), - storage_(std::move(directory)) { - constexpr std::string_view proc_name = "kafka_set_stream_offset"; - auto set_stream_offset = [ictx = interpreter_context, proc_name](mgp_list *args, mgp_graph * /*graph*/, - mgp_result *result, mgp_memory * /*memory*/) { - auto *arg_stream_name = procedure::Call(mgp_list_at, args, 0); - const auto *stream_name = procedure::Call(mgp_value_get_string, arg_stream_name); - auto *arg_offset = procedure::Call(mgp_list_at, args, 1); - const auto offset = procedure::Call(mgp_value_get_int, arg_offset); - const auto error = ictx->streams.SetStreamOffset(stream_name, offset); - if (error.HasError()) { - MG_ASSERT(mgp_result_set_error_msg(result, error.GetError().c_str()) == MGP_ERROR_NO_ERROR, - "Unable to set procedure error message of procedure: {}", proc_name); - } - }; - - mgp_proc proc(proc_name, set_stream_offset, utils::NewDeleteResource(), false); - MG_ASSERT(mgp_proc_add_arg(&proc, "stream_name", procedure::Call(mgp_type_string)) == MGP_ERROR_NO_ERROR); - MG_ASSERT(mgp_proc_add_arg(&proc, "offset", procedure::Call(mgp_type_int)) == MGP_ERROR_NO_ERROR); - - procedure::gModuleRegistry.RegisterMgProcedure(proc_name, std::move(proc)); -} - -void Streams::RestoreStreams() { - spdlog::info("Loading streams..."); - auto locked_streams_map = streams_.Lock(); - MG_ASSERT(locked_streams_map->empty(), "Cannot restore streams when some streams already exist!"); - - for (const auto &[stream_name, stream_data] : storage_) { - const auto get_failed_message = [&stream_name = stream_name](const std::string_view message, - const std::string_view nested_message) { - return fmt::format("Failed to load stream '{}', because: {} caused by {}", stream_name, message, nested_message); - }; - - StreamStatus status; - try { - nlohmann::json::parse(stream_data).get_to(status); - } catch (const nlohmann::json::type_error &exception) { - spdlog::warn(get_failed_message("invalid type conversion", exception.what())); - continue; - } catch (const nlohmann::json::out_of_range &exception) { - spdlog::warn(get_failed_message("non existing field", exception.what())); - continue; - } - MG_ASSERT(status.name == stream_name, "Expected stream name is '{}', but got '{}'", status.name, stream_name); - - try { - auto it = CreateConsumer(*locked_streams_map, stream_name, std::move(status.info)); - if (status.is_running) { - it->second.consumer->Lock()->Start(); - } - spdlog::info("Stream '{}' is loaded", stream_name); - } catch (const utils::BasicException &exception) { - spdlog::warn(get_failed_message("unexpected error", exception.what())); - } - } -} - -void Streams::Create(const std::string &stream_name, StreamInfo info) { - auto locked_streams = streams_.Lock(); - auto it = CreateConsumer(*locked_streams, stream_name, std::move(info)); - - try { - Persist( - CreateStatus(stream_name, it->second.transformation_name, it->second.owner, *it->second.consumer->ReadLock())); - } catch (...) { - locked_streams->erase(it); - throw; - } -} - -void Streams::Drop(const std::string &stream_name) { - auto locked_streams = streams_.Lock(); - - auto it = GetStream(*locked_streams, stream_name); - - // streams_ is write locked, which means there is no access to it outside of this function, thus only the Test - // function can be executing with the consumer, nothing else. - // By acquiring the write lock here for the consumer, we make sure there is - // no running Test function for this consumer, therefore it can be erased. - it->second.consumer->Lock(); - locked_streams->erase(it); - - if (!storage_.Delete(stream_name)) { - throw StreamsException("Couldn't delete stream '{}' from persistent store!", stream_name); - } - - // TODO(antaljanosbenjamin) Release the transformation -} - -void Streams::Start(const std::string &stream_name) { - auto locked_streams = streams_.Lock(); - auto it = GetStream(*locked_streams, stream_name); - - auto locked_consumer = it->second.consumer->Lock(); - locked_consumer->Start(); - - Persist(CreateStatus(stream_name, it->second.transformation_name, it->second.owner, *locked_consumer)); -} - -void Streams::Stop(const std::string &stream_name) { - auto locked_streams = streams_.Lock(); - auto it = GetStream(*locked_streams, stream_name); - - auto locked_consumer = it->second.consumer->Lock(); - locked_consumer->Stop(); - - Persist(CreateStatus(stream_name, it->second.transformation_name, it->second.owner, *locked_consumer)); -} - -void Streams::StartAll() { - for (auto locked_streams = streams_.Lock(); auto &[stream_name, stream_data] : *locked_streams) { - auto locked_consumer = stream_data.consumer->Lock(); - if (!locked_consumer->IsRunning()) { - locked_consumer->Start(); - Persist(CreateStatus(stream_name, stream_data.transformation_name, stream_data.owner, *locked_consumer)); - } - } -} - -void Streams::StopAll() { - for (auto locked_streams = streams_.Lock(); auto &[stream_name, stream_data] : *locked_streams) { - auto locked_consumer = stream_data.consumer->Lock(); - if (locked_consumer->IsRunning()) { - locked_consumer->Stop(); - Persist(CreateStatus(stream_name, stream_data.transformation_name, stream_data.owner, *locked_consumer)); - } - } -} - -std::vector Streams::GetStreamInfo() const { - std::vector result; - { - for (auto locked_streams = streams_.ReadLock(); const auto &[stream_name, stream_data] : *locked_streams) { - result.emplace_back(CreateStatus(stream_name, stream_data.transformation_name, stream_data.owner, - *stream_data.consumer->ReadLock())); - } - } - return result; -} - -TransformationResult Streams::Check(const std::string &stream_name, std::optional timeout, - std::optional batch_limit) const { - // This depends on the fact that Drop will first acquire a write lock to the consumer, and erase it only after that - auto [locked_consumer, - transformation_name] = [this, &stream_name]() -> std::pair { - auto locked_streams = streams_.ReadLock(); - auto it = GetStream(*locked_streams, stream_name); - return {it->second.consumer->ReadLock(), it->second.transformation_name}; - }(); - - auto *memory_resource = utils::NewDeleteResource(); - mgp_result result{nullptr, memory_resource}; - TransformationResult test_result; - - auto consumer_function = [interpreter_context = interpreter_context_, memory_resource, &stream_name, - &transformation_name = transformation_name, &result, - &test_result](const std::vector &messages) mutable { - auto accessor = interpreter_context->db->Access(); - CallCustomTransformation(transformation_name, messages, result, accessor, *memory_resource, stream_name); - - for (auto &row : result.rows) { - auto [query, parameters] = ExtractTransformationResult(std::move(row.values), transformation_name, stream_name); - std::vector result_row; - result_row.reserve(kExpectedTransformationResultSize); - result_row.push_back(std::move(query)); - result_row.push_back(std::move(parameters)); - - test_result.push_back(std::move(result_row)); - } - }; - - locked_consumer->Check(timeout, batch_limit, consumer_function); - - return test_result; -} - -StreamStatus Streams::CreateStatus(const std::string &name, const std::string &transformation_name, - const std::optional &owner, - const integrations::kafka::Consumer &consumer) { - const auto &info = consumer.Info(); - return StreamStatus{name, - StreamInfo{ - info.topics, - info.consumer_group, - info.batch_interval, - info.batch_size, - transformation_name, - owner, - }, - consumer.IsRunning()}; -} - -Streams::StreamsMap::iterator Streams::CreateConsumer(StreamsMap &map, const std::string &stream_name, - StreamInfo stream_info) { - if (map.contains(stream_name)) { - throw StreamsException{"Stream already exists with name '{}'", stream_name}; - } - - auto *memory_resource = utils::NewDeleteResource(); - - auto consumer_function = [interpreter_context = interpreter_context_, memory_resource, stream_name, - transformation_name = stream_info.transformation_name, owner = stream_info.owner, - interpreter = std::make_shared(interpreter_context_), - result = mgp_result{nullptr, memory_resource}]( - const std::vector &messages) mutable { - auto accessor = interpreter_context->db->Access(); - EventCounter::IncrementCounter(EventCounter::MessagesConsumed, messages.size()); - CallCustomTransformation(transformation_name, messages, result, accessor, *memory_resource, stream_name); - - DiscardValueResultStream stream; - - spdlog::trace("Start transaction in stream '{}'", stream_name); - utils::OnScopeExit cleanup{[&interpreter, &result]() { - result.rows.clear(); - interpreter->Abort(); - }}; - interpreter->BeginTransaction(); - - for (auto &row : result.rows) { - spdlog::trace("Processing row in stream '{}'", stream_name); - auto [query_value, params_value] = - ExtractTransformationResult(std::move(row.values), transformation_name, stream_name); - storage::PropertyValue params_prop{params_value}; - - std::string query{query_value.ValueString()}; - spdlog::trace("Executing query '{}' in stream '{}'", query, stream_name); - auto prepare_result = - interpreter->Prepare(query, params_prop.IsNull() ? empty_parameters : params_prop.ValueMap(), nullptr); - if (!interpreter_context->auth_checker->IsUserAuthorized(owner, prepare_result.privileges)) { - throw StreamsException{ - "Couldn't execute query '{}' for stream '{}' becuase the owner is not authorized to execute the " - "query!", - query, stream_name}; - } - interpreter->PullAll(&stream); - } - - spdlog::trace("Commit transaction in stream '{}'", stream_name); - interpreter->CommitTransaction(); - result.rows.clear(); - }; - - ConsumerInfo consumer_info{ - .consumer_name = stream_name, - .topics = std::move(stream_info.topics), - .consumer_group = std::move(stream_info.consumer_group), - .batch_interval = stream_info.batch_interval, - .batch_size = stream_info.batch_size, - }; - - auto bootstrap_servers = - stream_info.bootstrap_servers.empty() ? bootstrap_servers_ : std::move(stream_info.bootstrap_servers); - auto insert_result = map.insert_or_assign( - stream_name, - StreamData{std::move(stream_info.transformation_name), std::move(stream_info.owner), - std::make_unique(std::move(bootstrap_servers), std::move(consumer_info), - std::move(consumer_function))}); - MG_ASSERT(insert_result.second, "Unexpected error during storing consumer '{}'", stream_name); - return insert_result.first; -} - -void Streams::Persist(StreamStatus &&status) { - const std::string stream_name = status.name; - if (!storage_.Put(stream_name, nlohmann::json(std::move(status)).dump())) { - throw StreamsException{"Couldn't persist steam data for stream '{}'", stream_name}; - } -} - -std::string_view Streams::BootstrapServers() const { return bootstrap_servers_; } - -utils::BasicResult Streams::SetStreamOffset(const std::string_view stream_name, int64_t offset) { - auto lock_ptr = streams_.Lock(); - auto it = GetStream(*lock_ptr, std::string(stream_name)); - auto consumer_lock_ptr = it->second.consumer->Lock(); - return consumer_lock_ptr->SetConsumerOffsets(offset); -} - -} // namespace query diff --git a/tests/e2e/streams/kafka_streams_tests.py b/tests/e2e/streams/kafka_streams_tests.py index 8206222df..db9fbdd25 100755 --- a/tests/e2e/streams/kafka_streams_tests.py +++ b/tests/e2e/streams/kafka_streams_tests.py @@ -353,20 +353,20 @@ def test_bootstrap_server_empty( @pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK) -def test_set_offset(producer, topics, connection, transformation): - assert len(topics) > 0 +def test_set_offset(kafka_producer, kafka_topics, connection, transformation): + assert len(kafka_topics) > 0 cursor = connection.cursor() common.execute_and_fetch_all( cursor, "CREATE KAFKA STREAM test " - f"TOPICS {topics[0]} " + f"TOPICS {kafka_topics[0]} " f"TRANSFORM {transformation} " "BATCH_SIZE 1", ) messages = [f"{i} message" for i in range(1, 21)] for message in messages: - producer.send(topics[0], message.encode()).get(timeout=60) + kafka_producer.send(kafka_topics[0], message.encode()).get(timeout=60) def consume(expected_msgs): common.start_stream(cursor, "test") @@ -415,7 +415,7 @@ def test_set_offset(producer, topics, connection, transformation): res = execute_set_offset_and_consume(-2, []) assert len(res) == 0 last_msg = "Final Message" - producer.send(topics[0], last_msg.encode()).get(timeout=60) + kafka_producer.send(kafka_topics[0], last_msg.encode()).get(timeout=60) res = consume([last_msg]) assert len(res) == 1 assert comparison_check("Final Message", res[0]) diff --git a/tests/e2e/streams/streams_tests.py b/tests/e2e/streams/streams_tests.py deleted file mode 100755 index a573a0cb2..000000000 --- a/tests/e2e/streams/streams_tests.py +++ /dev/null @@ -1,552 +0,0 @@ -#!/usr/bin/python3 - -# Copyright 2021 Memgraph Ltd. -# -# Use of this software is governed by the Business Source License -# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source -# License, and you may not use this file except in compliance with the Business Source License. -# -# As of the Change Date specified in that file, in accordance with -# the Business Source License, use of this software will be governed -# by the Apache License, Version 2.0, included in the file -# licenses/APL.txt. - -import sys -import pytest -import mgclient -import time -from multiprocessing import Process, Value -import common - -# These are the indices of the query and parameters in the result of CHECK -# STREAM query -QUERY = 0 -PARAMS = 1 - -TRANSFORMATIONS_TO_CHECK = ["transform.simple", "transform.with_parameters"] - -SIMPLE_MSG = b"message" - - -@pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK) -def test_simple(producer, topics, connection, transformation): - assert len(topics) > 0 - cursor = connection.cursor() - common.execute_and_fetch_all( - cursor, - "CREATE KAFKA STREAM test " - f"TOPICS {','.join(topics)} " - f"TRANSFORM {transformation}", - ) - common.start_stream(cursor, "test") - time.sleep(5) - - for topic in topics: - producer.send(topic, SIMPLE_MSG).get(timeout=60) - - for topic in topics: - common.check_vertex_exists_with_topic_and_payload( - cursor, topic, SIMPLE_MSG - ) - - -@pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK) -def test_separate_consumers(producer, topics, connection, transformation): - assert len(topics) > 0 - cursor = connection.cursor() - - stream_names = [] - for topic in topics: - stream_name = "stream_" + topic - stream_names.append(stream_name) - common.execute_and_fetch_all( - cursor, - f"CREATE KAFKA STREAM {stream_name} " - f"TOPICS {topic} " - f"TRANSFORM {transformation}", - ) - - for stream_name in stream_names: - common.start_stream(cursor, stream_name) - - time.sleep(5) - - for topic in topics: - producer.send(topic, SIMPLE_MSG).get(timeout=60) - - for topic in topics: - common.check_vertex_exists_with_topic_and_payload( - cursor, topic, SIMPLE_MSG - ) - - -def test_start_from_last_committed_offset(producer, topics, connection): - # This test creates a stream, consumes a message to have a committed - # offset, then destroys the stream. A new message is sent before the - # stream is recreated and then restarted. This simulates when Memgraph is - # stopped (stream is destroyed) and then restarted (stream is recreated). - # This is of course not as good as restarting memgraph would be, but - # restarting Memgraph during a single workload cannot be done currently. - assert len(topics) > 0 - cursor = connection.cursor() - common.execute_and_fetch_all( - cursor, - "CREATE KAFKA STREAM test " - f"TOPICS {topics[0]} " - "TRANSFORM kafka_transform.simple", - ) - common.start_stream(cursor, "test") - time.sleep(1) - - producer.send(topics[0], SIMPLE_MSG).get(timeout=60) - - common.check_vertex_exists_with_topic_and_payload( - cursor, topics[0], SIMPLE_MSG - ) - - common.stop_stream(cursor, "test") - common.drop_stream(cursor, "test") - - messages = [b"second message", b"third message"] - for message in messages: - producer.send(topics[0], message).get(timeout=60) - - for message in messages: - vertices_with_msg = common.execute_and_fetch_all( - cursor, - "MATCH (n: MESSAGE {" - f"payload: '{message.decode('utf-8')}'" - "}) RETURN n", - ) - - assert len(vertices_with_msg) == 0 - - common.execute_and_fetch_all( - cursor, - "CREATE KAFKA STREAM test " - f"TOPICS {topics[0]} " - "TRANSFORM kafka_transform.simple", - ) - common.start_stream(cursor, "test") - - for message in messages: - common.check_vertex_exists_with_topic_and_payload( - cursor, topics[0], message - ) - - -@pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK) -def test_check_stream(producer, topics, connection, transformation): - assert len(topics) > 0 - cursor = connection.cursor() - common.execute_and_fetch_all( - cursor, - "CREATE KAFKA STREAM test " - f"TOPICS {topics[0]} " - f"TRANSFORM {transformation} " - "BATCH_SIZE 1", - ) - common.start_stream(cursor, "test") - time.sleep(1) - - producer.send(topics[0], SIMPLE_MSG).get(timeout=60) - common.stop_stream(cursor, "test") - - messages = [b"first message", b"second message", b"third message"] - for message in messages: - producer.send(topics[0], message).get(timeout=60) - - def check_check_stream(batch_limit): - assert ( - transformation == "transform.simple" - or transformation == "transform.with_parameters" - ) - test_results = common.execute_and_fetch_all( - cursor, f"CHECK STREAM test BATCH_LIMIT {batch_limit}" - ) - assert len(test_results) == batch_limit - - for i in range(batch_limit): - message_as_str = messages[i].decode("utf-8") - if transformation == "transform.simple": - assert f"payload: '{message_as_str}'" in test_results[i][QUERY] - assert test_results[i][PARAMS] is None - else: - assert test_results[i][QUERY] == ( - "CREATE (n:MESSAGE " - "{timestamp: $timestamp, " - "payload: $payload, " - "topic: $topic, " - "offset: $offset})" - ) - parameters = test_results[i][PARAMS] - # this is not a very sofisticated test, but checks if - # timestamp has some kind of value - assert parameters["timestamp"] > 1000000000000 - assert parameters["topic"] == topics[0] - assert parameters["payload"] == message_as_str - - check_check_stream(1) - check_check_stream(2) - check_check_stream(3) - common.start_stream(cursor, "test") - - for message in messages: - common.check_vertex_exists_with_topic_and_payload( - cursor, topics[0], message - ) - - -def test_show_streams(producer, topics, connection): - assert len(topics) > 1 - cursor = connection.cursor() - common.execute_and_fetch_all( - cursor, - "CREATE KAFKA STREAM default_values " - f"TOPICS {topics[0]} " - f"TRANSFORM transform.simple " - f"BOOTSTRAP_SERVERS 'localhost:9092'", - ) - - consumer_group = "my_special_consumer_group" - batch_interval = 42 - batch_size = 3 - common.execute_and_fetch_all( - cursor, - "CREATE KAFKA STREAM complex_values " - f"TOPICS {','.join(topics)} " - f"TRANSFORM transform.with_parameters " - f"CONSUMER_GROUP {consumer_group} " - f"BATCH_INTERVAL {batch_interval} " - f"BATCH_SIZE {batch_size} ", - ) - - assert len(common.execute_and_fetch_all(cursor, "SHOW STREAMS")) == 2 - - common.check_stream_info( - cursor, - "default_values", - ("default_values", None, None, "transform.simple", None, False), - ) - - common.check_stream_info( - cursor, - "complex_values", - ( - "complex_values", - batch_interval, - batch_size, - "transform.with_parameters", - None, - False, - ), - ) - - -@pytest.mark.parametrize("operation", ["START", "STOP"]) -def test_start_and_stop_during_check(producer, topics, connection, operation): - # This test is quite complex. The goal is to call START/STOP queries - # while a CHECK query is waiting for its result. Because the Global - # Interpreter Lock, running queries on multiple threads is not useful, - # because only one of them can call Cursor::execute at a time. Therefore - # multiple processes are used to execute the queries, because different - # processes have different GILs. - # The counter variables are thread- and process-safe variables to - # synchronize between the different processes. Each value represents a - # specific phase of the execution of the processes. - assert len(topics) > 1 - assert operation == "START" or operation == "STOP" - cursor = connection.cursor() - common.execute_and_fetch_all( - cursor, - "CREATE KAFKA STREAM test_stream " - f"TOPICS {topics[0]} " - f"TRANSFORM transform.simple", - ) - - check_counter = Value("i", 0) - check_result_len = Value("i", 0) - operation_counter = Value("i", 0) - - CHECK_BEFORE_EXECUTE = 1 - CHECK_AFTER_FETCHALL = 2 - CHECK_CORRECT_RESULT = 3 - CHECK_INCORRECT_RESULT = 4 - - def call_check(counter, result_len): - # This process will call the CHECK query and increment the counter - # based on its progress and expected behavior - connection = common.connect() - cursor = connection.cursor() - counter.value = CHECK_BEFORE_EXECUTE - result = common.execute_and_fetch_all( - cursor, "CHECK STREAM test_stream" - ) - result_len.value = len(result) - counter.value = CHECK_AFTER_FETCHALL - if len(result) > 0 and "payload: 'message'" in result[0][QUERY]: - counter.value = CHECK_CORRECT_RESULT - else: - counter.value = CHECK_INCORRECT_RESULT - - OP_BEFORE_EXECUTE = 1 - OP_AFTER_FETCHALL = 2 - OP_ALREADY_STOPPED_EXCEPTION = 3 - OP_INCORRECT_ALREADY_STOPPED_EXCEPTION = 4 - OP_UNEXPECTED_EXCEPTION = 5 - - def call_operation(counter): - # This porcess will call the query with the specified operation and - # increment the counter based on its progress and expected behavior - connection = common.connect() - cursor = connection.cursor() - counter.value = OP_BEFORE_EXECUTE - try: - common.execute_and_fetch_all( - cursor, f"{operation} STREAM test_stream" - ) - counter.value = OP_AFTER_FETCHALL - except mgclient.DatabaseError as e: - if "Kafka consumer test_stream is already stopped" in str(e): - counter.value = OP_ALREADY_STOPPED_EXCEPTION - else: - counter.value = OP_INCORRECT_ALREADY_STOPPED_EXCEPTION - except Exception: - counter.value = OP_UNEXPECTED_EXCEPTION - - check_stream_proc = Process( - target=call_check, daemon=True, args=(check_counter, check_result_len) - ) - operation_proc = Process( - target=call_operation, daemon=True, args=(operation_counter,) - ) - - try: - check_stream_proc.start() - - time.sleep(0.5) - - assert common.timed_wait( - lambda: check_counter.value == CHECK_BEFORE_EXECUTE - ) - assert common.timed_wait( - lambda: common.get_is_running(cursor, "test_stream") - ) - assert check_counter.value == CHECK_BEFORE_EXECUTE, ( - "SHOW STREAMS " "was blocked until the end of CHECK STREAM" - ) - operation_proc.start() - assert common.timed_wait( - lambda: operation_counter.value == OP_BEFORE_EXECUTE - ) - - producer.send(topics[0], SIMPLE_MSG).get(timeout=60) - assert common.timed_wait( - lambda: check_counter.value > CHECK_AFTER_FETCHALL - ) - assert check_counter.value == CHECK_CORRECT_RESULT - assert check_result_len.value == 1 - check_stream_proc.join() - - operation_proc.join() - if operation == "START": - assert operation_counter.value == OP_AFTER_FETCHALL - assert common.get_is_running(cursor, "test_stream") - else: - assert operation_counter.value == OP_ALREADY_STOPPED_EXCEPTION - assert not common.get_is_running(cursor, "test_stream") - - finally: - # to make sure CHECK STREAM finishes - producer.send(topics[0], SIMPLE_MSG).get(timeout=60) - if check_stream_proc.is_alive(): - check_stream_proc.terminate() - if operation_proc.is_alive(): - operation_proc.terminate() - - -def test_check_already_started_stream(topics, connection): - assert len(topics) > 0 - cursor = connection.cursor() - - common.execute_and_fetch_all( - cursor, - "CREATE KAFKA STREAM started_stream " - f"TOPICS {topics[0]} " - f"TRANSFORM transform.simple", - ) - common.start_stream(cursor, "started_stream") - - with pytest.raises(mgclient.DatabaseError): - common.execute_and_fetch_all(cursor, "CHECK STREAM started_stream") - - -def test_start_checked_stream_after_timeout(topics, connection): - cursor = connection.cursor() - common.execute_and_fetch_all( - cursor, - "CREATE KAFKA STREAM test_stream " - f"TOPICS {topics[0]} " - f"TRANSFORM transform.simple", - ) - - timeout_ms = 2000 - - def call_check(): - common.execute_and_fetch_all( - common.connect().cursor(), - f"CHECK STREAM test_stream TIMEOUT {timeout_ms}", - ) - - check_stream_proc = Process(target=call_check, daemon=True) - - start = time.time() - check_stream_proc.start() - assert common.timed_wait( - lambda: common.get_is_running(cursor, "test_stream") - ) - common.start_stream(cursor, "test_stream") - end = time.time() - - assert ( - end - start - ) < 1.3 * timeout_ms, "The START STREAM was blocked too long" - assert common.get_is_running(cursor, "test_stream") - common.stop_stream(cursor, "test_stream") - - -def test_restart_after_error(producer, topics, connection): - cursor = connection.cursor() - common.execute_and_fetch_all( - cursor, - "CREATE KAFKA STREAM test_stream " - f"TOPICS {topics[0]} " - f"TRANSFORM transform.query", - ) - - common.start_stream(cursor, "test_stream") - time.sleep(1) - - producer.send(topics[0], SIMPLE_MSG).get(timeout=60) - assert common.timed_wait( - lambda: not common.get_is_running(cursor, "test_stream") - ) - - common.start_stream(cursor, "test_stream") - time.sleep(1) - producer.send(topics[0], b"CREATE (n:VERTEX { id : 42 })") - assert common.check_one_result_row( - cursor, "MATCH (n:VERTEX { id : 42 }) RETURN n" - ) - - -@pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK) -def test_bootstrap_server(producer, topics, connection, transformation): - assert len(topics) > 0 - cursor = connection.cursor() - local = "localhost:9092" - common.execute_and_fetch_all( - cursor, - "CREATE KAFKA STREAM test " - f"TOPICS {','.join(topics)} " - f"TRANSFORM {transformation} " - f"BOOTSTRAP_SERVERS '{local}'", - ) - common.start_stream(cursor, "test") - time.sleep(5) - - for topic in topics: - producer.send(topic, SIMPLE_MSG).get(timeout=60) - - for topic in topics: - common.check_vertex_exists_with_topic_and_payload( - cursor, topic, SIMPLE_MSG - ) - - -@pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK) -def test_bootstrap_server_empty(producer, topics, connection, transformation): - assert len(topics) > 0 - cursor = connection.cursor() - with pytest.raises(mgclient.DatabaseError): - common.execute_and_fetch_all( - cursor, - "CREATE KAFKA STREAM test " - f"TOPICS {','.join(topics)} " - f"TRANSFORM {transformation} " - "BOOTSTRAP_SERVERS ''", - ) - - -@pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK) -def test_set_offset(producer, topics, connection, transformation): - assert len(topics) > 0 - cursor = connection.cursor() - common.execute_and_fetch_all( - cursor, - "CREATE STREAM test " - f"TOPICS {topics[0]} " - f"TRANSFORM {transformation} " - "BATCH_SIZE 1", - ) - - messages = [f"{i} message" for i in range(1, 21)] - for message in messages: - producer.send(topics[0], message.encode()).get(timeout=60) - - def consume(expected_msgs): - common.start_stream(cursor, "test") - if len(expected_msgs) == 0: - time.sleep(2) - else: - assert common.check_one_result_row( - cursor, - ( - f"MATCH (n: MESSAGE {{payload: '{expected_msgs[-1]}'}})" - "RETURN n" - ), - ) - common.stop_stream(cursor, "test") - res = common.execute_and_fetch_all( - cursor, "MATCH (n) RETURN n.payload" - ) - return res - - def execute_set_offset_and_consume(id, expected_msgs): - common.execute_and_fetch_all( - cursor, f"CALL mg.kafka_set_stream_offset('test', {id})" - ) - return consume(expected_msgs) - - with pytest.raises(mgclient.DatabaseError): - res = common.execute_and_fetch_all( - cursor, "CALL mg.kafka_set_stream_offset('foo', 10)" - ) - - def comparison_check(a, b): - return a == str(b).strip("'(,)") - - res = execute_set_offset_and_consume(10, messages[10:]) - assert len(res) == 10 - assert all([comparison_check(a, b) for a, b in zip(messages[10:], res)]) - common.execute_and_fetch_all(cursor, "MATCH (n) DETACH DELETE n") - - res = execute_set_offset_and_consume(-1, messages) - assert len(res) == len(messages) - assert all([comparison_check(a, b) for a, b in zip(messages, res)]) - res = common.execute_and_fetch_all(cursor, "MATCH (n) return n.offset") - assert all([comparison_check(str(i), res[i]) for i in range(1, 20)]) - res = common.execute_and_fetch_all(cursor, "MATCH (n) DETACH DELETE n") - - res = execute_set_offset_and_consume(-2, []) - assert len(res) == 0 - last_msg = "Final Message" - producer.send(topics[0], last_msg.encode()).get(timeout=60) - res = consume([last_msg]) - assert len(res) == 1 - assert comparison_check("Final Message", res[0]) - common.execute_and_fetch_all(cursor, "MATCH (n) DETACH DELETE n") - - -if __name__ == "__main__": - sys.exit(pytest.main([__file__, "-rA"])) diff --git a/tests/e2e/streams/transformations/kafka_transform.py b/tests/e2e/streams/transformations/kafka_transform.py index 52f234bd0..4967e0f40 100644 --- a/tests/e2e/streams/transformations/kafka_transform.py +++ b/tests/e2e/streams/transformations/kafka_transform.py @@ -29,7 +29,7 @@ def simple( CREATE (n:MESSAGE {{ timestamp: '{message.timestamp()}', payload: '{payload_as_str}', - offset: {message.offset()} + offset: '{message.offset()}', topic: '{message.topic_name()}' }})""", parameters=None))