Small polishing and fixes
This commit is contained in:
parent
e8976e0f1c
commit
12f4e0068a
@ -1470,6 +1470,9 @@ enum mgp_error mgp_message_key_size(struct mgp_message *message, size_t *result)
|
||||
enum mgp_error mgp_message_timestamp(struct mgp_message *message, int64_t *result);
|
||||
|
||||
/// Get the message offset from a message.
|
||||
/// Supported stream sources:
|
||||
/// - Kafka
|
||||
/// Return MGP_ERROR_INVALID_ARGUMENT if the message is from an unsupported stream source.
|
||||
enum mgp_error mgp_message_offset(struct mgp_message *message, int64_t *result);
|
||||
|
||||
/// Get the number of messages contained in the mgp_messages list
|
||||
|
@ -1340,6 +1340,12 @@ class Message:
|
||||
return self._message.timestamp()
|
||||
|
||||
def offset(self) -> int:
|
||||
"""
|
||||
Supported stream sources:
|
||||
- Kafka
|
||||
|
||||
Raise InvalidArgumentError if the message is from an unsupported stream source.
|
||||
"""
|
||||
if not self.is_valid():
|
||||
raise InvalidMessageError()
|
||||
return self._message.offset()
|
||||
|
@ -60,7 +60,7 @@ index 508e4f4..87c5f2a 100644
|
||||
decompressed.bytesWritten(uncompressedSize);
|
||||
decoded = decompressed;
|
||||
diff --git a/pulsar-client-cpp/lib/lz4/lz4.c b/pulsar-client-cpp/lib/lz4/lz4.c
|
||||
index 08cf6b5..d74c287 100644
|
||||
index 08cf6b5..07d3e01 100644
|
||||
--- a/pulsar-client-cpp/lib/lz4/lz4.c
|
||||
+++ b/pulsar-client-cpp/lib/lz4/lz4.c
|
||||
@@ -45,7 +45,7 @@
|
||||
@ -174,7 +174,7 @@ index 08cf6b5..d74c287 100644
|
||||
|
||||
#define KB *(1 <<10)
|
||||
#define MB *(1 <<20)
|
||||
@@ -239,7 +239,7 @@ static const int LZ4_minLength = (MFLIMIT+1);
|
||||
@@ -239,15 +239,15 @@ static const int LZ4_minLength = (MFLIMIT+1);
|
||||
/**************************************
|
||||
* Common Utils
|
||||
**************************************/
|
||||
@ -183,9 +183,10 @@ index 08cf6b5..d74c287 100644
|
||||
|
||||
|
||||
/**************************************
|
||||
@@ -247,7 +247,7 @@ static const int LZ4_minLength = (MFLIMIT+1);
|
||||
* Common functions
|
||||
**************************************/
|
||||
static unsigned LZ4_NbCommonBytes (register size_t val)
|
||||
-static unsigned LZ4_NbCommonBytes (register size_t val)
|
||||
+static unsigned PULSAR_LZ4_NbCommonBytes (register size_t val)
|
||||
{
|
||||
- if (LZ4_isLittleEndian())
|
||||
+ if (PULSAR_LZ4_isLittleEndian())
|
||||
@ -206,7 +207,8 @@ index 08cf6b5..d74c287 100644
|
||||
- size_t diff = LZ4_read_ARCH(pMatch) ^ LZ4_read_ARCH(pIn);
|
||||
+ size_t diff = PULSAR_LZ4_read_ARCH(pMatch) ^ PULSAR_LZ4_read_ARCH(pIn);
|
||||
if (!diff) { pIn+=STEPSIZE; pMatch+=STEPSIZE; continue; }
|
||||
pIn += LZ4_NbCommonBytes(diff);
|
||||
- pIn += LZ4_NbCommonBytes(diff);
|
||||
+ pIn += PULSAR_LZ4_NbCommonBytes(diff);
|
||||
return (unsigned)(pIn - pStart);
|
||||
}
|
||||
|
||||
@ -217,11 +219,13 @@ index 08cf6b5..d74c287 100644
|
||||
if ((pIn<pInLimit) && (*pMatch == *pIn)) pIn++;
|
||||
return (unsigned)(pIn - pStart);
|
||||
}
|
||||
@@ -340,7 +340,7 @@ static unsigned LZ4_count(const BYTE* pIn, const BYTE* pMatch, const BYTE* pInLi
|
||||
@@ -339,8 +339,8 @@ static unsigned LZ4_count(const BYTE* pIn, const BYTE* pMatch, const BYTE* pInLi
|
||||
#define HASHTABLESIZE (1 << LZ4_MEMORY_USAGE)
|
||||
#define HASH_SIZE_U32 (1 << LZ4_HASHLOG) /* required as macro for static allocation */
|
||||
|
||||
static const int LZ4_64Klimit = ((64 KB) + (MFLIMIT-1));
|
||||
-static const int LZ4_64Klimit = ((64 KB) + (MFLIMIT-1));
|
||||
-static const U32 LZ4_skipTrigger = 6; /* Increase this value ==> compression run slower on incompressible data */
|
||||
+static const int PULSAR_LZ4_64Klimit = ((64 KB) + (MFLIMIT-1));
|
||||
+static const U32 PULSAR_LZ4_skipTrigger = 6; /* Increase this value ==> compression run slower on incompressible data */
|
||||
|
||||
|
||||
@ -331,11 +335,13 @@ index 08cf6b5..d74c287 100644
|
||||
|
||||
const BYTE* ip = (const BYTE*) source;
|
||||
const BYTE* base;
|
||||
@@ -483,11 +483,11 @@ FORCE_INLINE int LZ4_compress_generic(
|
||||
@@ -482,12 +482,12 @@ FORCE_INLINE int LZ4_compress_generic(
|
||||
lowLimit = (const BYTE*)source;
|
||||
break;
|
||||
}
|
||||
if ((tableType == byU16) && (inputSize>=LZ4_64Klimit)) return 0; /* Size too large (not within 64K limit) */
|
||||
- if ((tableType == byU16) && (inputSize>=LZ4_64Klimit)) return 0; /* Size too large (not within 64K limit) */
|
||||
- if (inputSize<LZ4_minLength) goto _last_literals; /* Input too small, no compression (all literals) */
|
||||
+ if ((tableType == byU16) && (inputSize>=PULSAR_LZ4_64Klimit)) return 0; /* Size too large (not within 64K limit) */
|
||||
+ if (inputSize<PULSAR_LZ4_minLength) goto _last_literals; /* Input too small, no compression (all literals) */
|
||||
|
||||
/* First Byte */
|
||||
@ -467,8 +473,9 @@ index 08cf6b5..d74c287 100644
|
||||
- if (maxOutputSize >= LZ4_compressBound(inputSize))
|
||||
+ if (maxOutputSize >= PULSAR_LZ4_compressBound(inputSize))
|
||||
{
|
||||
if (inputSize < LZ4_64Klimit)
|
||||
- if (inputSize < LZ4_64Klimit)
|
||||
- return LZ4_compress_generic(state, source, dest, inputSize, 0, notLimited, byU16, noDict, noDictIssue, acceleration);
|
||||
+ if (inputSize < PULSAR_LZ4_64Klimit)
|
||||
+ return PULSAR_LZ4_compress_generic(state, source, dest, inputSize, 0, notLimited, byU16, noDict, noDictIssue, acceleration);
|
||||
else
|
||||
- return LZ4_compress_generic(state, source, dest, inputSize, 0, notLimited, LZ4_64bits() ? byU32 : byPtr, noDict, noDictIssue, acceleration);
|
||||
@ -476,8 +483,9 @@ index 08cf6b5..d74c287 100644
|
||||
}
|
||||
else
|
||||
{
|
||||
if (inputSize < LZ4_64Klimit)
|
||||
- if (inputSize < LZ4_64Klimit)
|
||||
- return LZ4_compress_generic(state, source, dest, inputSize, maxOutputSize, limitedOutput, byU16, noDict, noDictIssue, acceleration);
|
||||
+ if (inputSize < PULSAR_LZ4_64Klimit)
|
||||
+ return PULSAR_LZ4_compress_generic(state, source, dest, inputSize, maxOutputSize, limitedOutput, byU16, noDict, noDictIssue, acceleration);
|
||||
else
|
||||
- return LZ4_compress_generic(state, source, dest, inputSize, maxOutputSize, limitedOutput, LZ4_64bits() ? byU32 : byPtr, noDict, noDictIssue, acceleration);
|
||||
@ -526,8 +534,9 @@ index 08cf6b5..d74c287 100644
|
||||
- LZ4_resetStream(&ctx);
|
||||
+ PULSAR_LZ4_resetStream(&ctx);
|
||||
|
||||
if (inputSize < LZ4_64Klimit)
|
||||
- if (inputSize < LZ4_64Klimit)
|
||||
- return LZ4_compress_generic(&ctx, source, dest, inputSize, maxOutputSize, limitedOutput, byU16, noDict, noDictIssue, acceleration);
|
||||
+ if (inputSize < PULSAR_LZ4_64Klimit)
|
||||
+ return PULSAR_LZ4_compress_generic(&ctx, source, dest, inputSize, maxOutputSize, limitedOutput, byU16, noDict, noDictIssue, acceleration);
|
||||
else
|
||||
- return LZ4_compress_generic(&ctx, source, dest, inputSize, maxOutputSize, limitedOutput, LZ4_64bits() ? byU32 : byPtr, noDict, noDictIssue, acceleration);
|
||||
@ -544,11 +553,13 @@ index 08cf6b5..d74c287 100644
|
||||
void* const ctx,
|
||||
const char* const src,
|
||||
char* const dst,
|
||||
@@ -748,12 +748,12 @@ static int LZ4_compress_destSize_generic(
|
||||
@@ -747,13 +747,13 @@ static int LZ4_compress_destSize_generic(
|
||||
/* Init conditions */
|
||||
if (targetDstSize < 1) return 0; /* Impossible to store anything */
|
||||
if ((U32)*srcSizePtr > (U32)LZ4_MAX_INPUT_SIZE) return 0; /* Unsupported input size, too large (or negative) */
|
||||
if ((tableType == byU16) && (*srcSizePtr>=LZ4_64Klimit)) return 0; /* Size too large (not within 64K limit) */
|
||||
- if ((tableType == byU16) && (*srcSizePtr>=LZ4_64Klimit)) return 0; /* Size too large (not within 64K limit) */
|
||||
- if (*srcSizePtr<LZ4_minLength) goto _last_literals; /* Input too small, no compression (all literals) */
|
||||
+ if ((tableType == byU16) && (*srcSizePtr>=PULSAR_LZ4_64Klimit)) return 0; /* Size too large (not within 64K limit) */
|
||||
+ if (*srcSizePtr<PULSAR_LZ4_minLength) goto _last_literals; /* Input too small, no compression (all literals) */
|
||||
|
||||
/* First Byte */
|
||||
@ -655,8 +666,9 @@ index 08cf6b5..d74c287 100644
|
||||
}
|
||||
else
|
||||
{
|
||||
if (*srcSizePtr < LZ4_64Klimit)
|
||||
- if (*srcSizePtr < LZ4_64Klimit)
|
||||
- return LZ4_compress_destSize_generic(state, src, dst, srcSizePtr, targetDstSize, byU16);
|
||||
+ if (*srcSizePtr < PULSAR_LZ4_64Klimit)
|
||||
+ return PULSAR_LZ4_compress_destSize_generic(state, src, dst, srcSizePtr, targetDstSize, byU16);
|
||||
else
|
||||
- return LZ4_compress_destSize_generic(state, src, dst, srcSizePtr, targetDstSize, LZ4_64bits() ? byU32 : byPtr);
|
||||
|
@ -2612,15 +2612,10 @@ mgp_error mgp_message_timestamp(mgp_message *message, int64_t *result) {
|
||||
mgp_error mgp_message_offset(struct mgp_message *message, int64_t *result) {
|
||||
return WrapExceptions(
|
||||
[message] {
|
||||
return std::visit(
|
||||
[]<typename T>(T &&msg) -> int64_t {
|
||||
using MessageType = std::decay_t<T>;
|
||||
if constexpr (std::same_as<MessageType, mgp_message::KafkaMessage>) {
|
||||
return msg->Offset();
|
||||
} else {
|
||||
throw std::invalid_argument("Invalid source type");
|
||||
}
|
||||
},
|
||||
return std::visit(utils::Overloaded{[](const mgp_message::KafkaMessage &msg) { return msg->Offset(); },
|
||||
[](const auto &msg) -> int64_t {
|
||||
throw InvalidMessageFunction(MessageToStreamSourceType(msg), "offset");
|
||||
}},
|
||||
message->msg);
|
||||
},
|
||||
result);
|
||||
|
@ -31,6 +31,7 @@
|
||||
#include "utils/memory.hpp"
|
||||
#include "utils/on_scope_exit.hpp"
|
||||
#include "utils/pmr/string.hpp"
|
||||
#include "utils/variant_helpers.hpp"
|
||||
|
||||
namespace EventCounter {
|
||||
extern const Event MessagesConsumed;
|
||||
@ -156,15 +157,6 @@ void from_json(const nlohmann::json &data, StreamStatus<TStream> &status) {
|
||||
from_json(data, status.info);
|
||||
}
|
||||
|
||||
namespace {
|
||||
template <class... Ts>
|
||||
struct Overloaded : Ts... {
|
||||
using Ts::operator()...;
|
||||
};
|
||||
template <class... Ts>
|
||||
Overloaded(Ts...) -> Overloaded<Ts...>;
|
||||
} // namespace
|
||||
|
||||
Streams::Streams(InterpreterContext *interpreter_context, std::filesystem::path directory)
|
||||
: interpreter_context_(interpreter_context), storage_(std::move(directory)) {
|
||||
constexpr std::string_view proc_name = "kafka_set_stream_offset";
|
||||
@ -176,12 +168,12 @@ Streams::Streams(InterpreterContext *interpreter_context, std::filesystem::path
|
||||
const auto offset = procedure::Call<int64_t>(mgp_value_get_int, arg_offset);
|
||||
auto lock_ptr = streams_.Lock();
|
||||
auto it = GetStream(*lock_ptr, std::string(stream_name));
|
||||
std::visit(Overloaded{[&](StreamData<KafkaStream> &kafka_stream) {
|
||||
std::visit(utils::Overloaded{
|
||||
[&](StreamData<KafkaStream> &kafka_stream) {
|
||||
auto stream_source_ptr = kafka_stream.stream_source->Lock();
|
||||
const auto error = stream_source_ptr->SetStreamOffset(offset);
|
||||
if (error.HasError()) {
|
||||
MG_ASSERT(
|
||||
mgp_result_set_error_msg(result, error.GetError().c_str()) == MGP_ERROR_NO_ERROR,
|
||||
MG_ASSERT(mgp_result_set_error_msg(result, error.GetError().c_str()) == MGP_ERROR_NO_ERROR,
|
||||
"Unable to set procedure error message of procedure: {}", proc_name);
|
||||
}
|
||||
},
|
||||
|
@ -1,466 +0,0 @@
|
||||
// Copyright 2021 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
|
||||
#include "query/streams.hpp"
|
||||
|
||||
#include <shared_mutex>
|
||||
#include <string_view>
|
||||
#include <utility>
|
||||
|
||||
#include <spdlog/spdlog.h>
|
||||
#include <json/json.hpp>
|
||||
#include "query/db_accessor.hpp"
|
||||
#include "query/discard_value_stream.hpp"
|
||||
#include "query/interpreter.hpp"
|
||||
#include "query/procedure//mg_procedure_helpers.hpp"
|
||||
#include "query/procedure/mg_procedure_impl.hpp"
|
||||
#include "query/procedure/module.hpp"
|
||||
#include "query/typed_value.hpp"
|
||||
#include "utils/event_counter.hpp"
|
||||
#include "utils/memory.hpp"
|
||||
#include "utils/on_scope_exit.hpp"
|
||||
#include "utils/pmr/string.hpp"
|
||||
|
||||
namespace EventCounter {
|
||||
extern const Event MessagesConsumed;
|
||||
} // namespace EventCounter
|
||||
|
||||
namespace query {
|
||||
|
||||
using Consumer = integrations::kafka::Consumer;
|
||||
using ConsumerInfo = integrations::kafka::ConsumerInfo;
|
||||
using Message = integrations::kafka::Message;
|
||||
namespace {
|
||||
constexpr auto kExpectedTransformationResultSize = 2;
|
||||
const utils::pmr::string query_param_name{"query", utils::NewDeleteResource()};
|
||||
const utils::pmr::string params_param_name{"parameters", utils::NewDeleteResource()};
|
||||
const std::map<std::string, storage::PropertyValue> empty_parameters{};
|
||||
|
||||
auto GetStream(auto &map, const std::string &stream_name) {
|
||||
if (auto it = map.find(stream_name); it != map.end()) {
|
||||
return it;
|
||||
}
|
||||
throw StreamsException("Couldn't find stream '{}'", stream_name);
|
||||
}
|
||||
|
||||
void CallCustomTransformation(const std::string &transformation_name, const std::vector<Message> &messages,
|
||||
mgp_result &result, storage::Storage::Accessor &storage_accessor,
|
||||
utils::MemoryResource &memory_resource, const std::string &stream_name) {
|
||||
DbAccessor db_accessor{&storage_accessor};
|
||||
{
|
||||
auto maybe_transformation =
|
||||
procedure::FindTransformation(procedure::gModuleRegistry, transformation_name, utils::NewDeleteResource());
|
||||
|
||||
if (!maybe_transformation) {
|
||||
throw StreamsException("Couldn't find transformation {} for stream '{}'", transformation_name, stream_name);
|
||||
};
|
||||
const auto &trans = *maybe_transformation->second;
|
||||
mgp_messages mgp_messages{mgp_messages::storage_type{&memory_resource}};
|
||||
std::transform(messages.begin(), messages.end(), std::back_inserter(mgp_messages.messages),
|
||||
[](const integrations::kafka::Message &message) { return mgp_message{&message}; });
|
||||
mgp_graph graph{&db_accessor, storage::View::OLD, nullptr};
|
||||
mgp_memory memory{&memory_resource};
|
||||
result.rows.clear();
|
||||
result.error_msg.reset();
|
||||
result.signature = &trans.results;
|
||||
|
||||
MG_ASSERT(result.signature->size() == kExpectedTransformationResultSize);
|
||||
MG_ASSERT(result.signature->contains(query_param_name));
|
||||
MG_ASSERT(result.signature->contains(params_param_name));
|
||||
|
||||
spdlog::trace("Calling transformation in stream '{}'", stream_name);
|
||||
trans.cb(&mgp_messages, &graph, &result, &memory);
|
||||
}
|
||||
if (result.error_msg.has_value()) {
|
||||
throw StreamsException(result.error_msg->c_str());
|
||||
}
|
||||
}
|
||||
|
||||
std::pair<TypedValue /*query*/, TypedValue /*parameters*/> ExtractTransformationResult(
|
||||
utils::pmr::map<utils::pmr::string, TypedValue> &&values, const std::string_view transformation_name,
|
||||
const std::string_view stream_name) {
|
||||
if (values.size() != kExpectedTransformationResultSize) {
|
||||
throw StreamsException(
|
||||
"Transformation '{}' in stream '{}' did not yield all fields (query, parameters) as required.",
|
||||
transformation_name, stream_name);
|
||||
}
|
||||
|
||||
auto get_value = [&](const utils::pmr::string &field_name) mutable -> TypedValue & {
|
||||
auto it = values.find(field_name);
|
||||
if (it == values.end()) {
|
||||
throw StreamsException{"Transformation '{}' in stream '{}' did not yield a record with '{}' field.",
|
||||
transformation_name, stream_name, field_name};
|
||||
};
|
||||
return it->second;
|
||||
};
|
||||
|
||||
auto &query_value = get_value(query_param_name);
|
||||
MG_ASSERT(query_value.IsString());
|
||||
auto ¶ms_value = get_value(params_param_name);
|
||||
MG_ASSERT(params_value.IsNull() || params_value.IsMap());
|
||||
return {std::move(query_value), std::move(params_value)};
|
||||
}
|
||||
} // namespace
|
||||
|
||||
// nlohmann::json doesn't support string_view access yet
|
||||
const std::string kStreamName{"name"};
|
||||
const std::string kTopicsKey{"topics"};
|
||||
const std::string kConsumerGroupKey{"consumer_group"};
|
||||
const std::string kBatchIntervalKey{"batch_interval"};
|
||||
const std::string kBatchSizeKey{"batch_size"};
|
||||
const std::string kIsRunningKey{"is_running"};
|
||||
const std::string kTransformationName{"transformation_name"};
|
||||
const std::string kOwner{"owner"};
|
||||
const std::string kBoostrapServers{"bootstrap_servers"};
|
||||
|
||||
void to_json(nlohmann::json &data, StreamStatus &&status) {
|
||||
auto &info = status.info;
|
||||
data[kStreamName] = std::move(status.name);
|
||||
data[kTopicsKey] = std::move(info.topics);
|
||||
data[kConsumerGroupKey] = info.consumer_group;
|
||||
|
||||
if (info.batch_interval) {
|
||||
data[kBatchIntervalKey] = info.batch_interval->count();
|
||||
} else {
|
||||
data[kBatchIntervalKey] = nullptr;
|
||||
}
|
||||
|
||||
if (info.batch_size) {
|
||||
data[kBatchSizeKey] = *info.batch_size;
|
||||
} else {
|
||||
data[kBatchSizeKey] = nullptr;
|
||||
}
|
||||
|
||||
data[kIsRunningKey] = status.is_running;
|
||||
data[kTransformationName] = status.info.transformation_name;
|
||||
|
||||
if (info.owner.has_value()) {
|
||||
data[kOwner] = std::move(*info.owner);
|
||||
} else {
|
||||
data[kOwner] = nullptr;
|
||||
}
|
||||
|
||||
data[kBoostrapServers] = std::move(info.bootstrap_servers);
|
||||
}
|
||||
|
||||
void from_json(const nlohmann::json &data, StreamStatus &status) {
|
||||
auto &info = status.info;
|
||||
data.at(kStreamName).get_to(status.name);
|
||||
data.at(kTopicsKey).get_to(info.topics);
|
||||
data.at(kConsumerGroupKey).get_to(info.consumer_group);
|
||||
|
||||
if (const auto batch_interval = data.at(kBatchIntervalKey); !batch_interval.is_null()) {
|
||||
using BatchInterval = decltype(info.batch_interval)::value_type;
|
||||
info.batch_interval = BatchInterval{batch_interval.get<BatchInterval::rep>()};
|
||||
} else {
|
||||
info.batch_interval = {};
|
||||
}
|
||||
|
||||
if (const auto batch_size = data.at(kBatchSizeKey); !batch_size.is_null()) {
|
||||
info.batch_size = batch_size.get<decltype(info.batch_size)::value_type>();
|
||||
} else {
|
||||
info.batch_size = {};
|
||||
}
|
||||
|
||||
data.at(kIsRunningKey).get_to(status.is_running);
|
||||
data.at(kTransformationName).get_to(status.info.transformation_name);
|
||||
|
||||
if (const auto &owner = data.at(kOwner); !owner.is_null()) {
|
||||
info.owner = owner.get<decltype(info.owner)::value_type>();
|
||||
} else {
|
||||
info.owner = {};
|
||||
}
|
||||
|
||||
info.owner = data.value(kBoostrapServers, "");
|
||||
}
|
||||
|
||||
Streams::Streams(InterpreterContext *interpreter_context, std::string bootstrap_servers,
|
||||
std::filesystem::path directory)
|
||||
: interpreter_context_(interpreter_context),
|
||||
bootstrap_servers_(std::move(bootstrap_servers)),
|
||||
storage_(std::move(directory)) {
|
||||
constexpr std::string_view proc_name = "kafka_set_stream_offset";
|
||||
auto set_stream_offset = [ictx = interpreter_context, proc_name](mgp_list *args, mgp_graph * /*graph*/,
|
||||
mgp_result *result, mgp_memory * /*memory*/) {
|
||||
auto *arg_stream_name = procedure::Call<mgp_value *>(mgp_list_at, args, 0);
|
||||
const auto *stream_name = procedure::Call<const char *>(mgp_value_get_string, arg_stream_name);
|
||||
auto *arg_offset = procedure::Call<mgp_value *>(mgp_list_at, args, 1);
|
||||
const auto offset = procedure::Call<int64_t>(mgp_value_get_int, arg_offset);
|
||||
const auto error = ictx->streams.SetStreamOffset(stream_name, offset);
|
||||
if (error.HasError()) {
|
||||
MG_ASSERT(mgp_result_set_error_msg(result, error.GetError().c_str()) == MGP_ERROR_NO_ERROR,
|
||||
"Unable to set procedure error message of procedure: {}", proc_name);
|
||||
}
|
||||
};
|
||||
|
||||
mgp_proc proc(proc_name, set_stream_offset, utils::NewDeleteResource(), false);
|
||||
MG_ASSERT(mgp_proc_add_arg(&proc, "stream_name", procedure::Call<mgp_type *>(mgp_type_string)) == MGP_ERROR_NO_ERROR);
|
||||
MG_ASSERT(mgp_proc_add_arg(&proc, "offset", procedure::Call<mgp_type *>(mgp_type_int)) == MGP_ERROR_NO_ERROR);
|
||||
|
||||
procedure::gModuleRegistry.RegisterMgProcedure(proc_name, std::move(proc));
|
||||
}
|
||||
|
||||
void Streams::RestoreStreams() {
|
||||
spdlog::info("Loading streams...");
|
||||
auto locked_streams_map = streams_.Lock();
|
||||
MG_ASSERT(locked_streams_map->empty(), "Cannot restore streams when some streams already exist!");
|
||||
|
||||
for (const auto &[stream_name, stream_data] : storage_) {
|
||||
const auto get_failed_message = [&stream_name = stream_name](const std::string_view message,
|
||||
const std::string_view nested_message) {
|
||||
return fmt::format("Failed to load stream '{}', because: {} caused by {}", stream_name, message, nested_message);
|
||||
};
|
||||
|
||||
StreamStatus status;
|
||||
try {
|
||||
nlohmann::json::parse(stream_data).get_to(status);
|
||||
} catch (const nlohmann::json::type_error &exception) {
|
||||
spdlog::warn(get_failed_message("invalid type conversion", exception.what()));
|
||||
continue;
|
||||
} catch (const nlohmann::json::out_of_range &exception) {
|
||||
spdlog::warn(get_failed_message("non existing field", exception.what()));
|
||||
continue;
|
||||
}
|
||||
MG_ASSERT(status.name == stream_name, "Expected stream name is '{}', but got '{}'", status.name, stream_name);
|
||||
|
||||
try {
|
||||
auto it = CreateConsumer(*locked_streams_map, stream_name, std::move(status.info));
|
||||
if (status.is_running) {
|
||||
it->second.consumer->Lock()->Start();
|
||||
}
|
||||
spdlog::info("Stream '{}' is loaded", stream_name);
|
||||
} catch (const utils::BasicException &exception) {
|
||||
spdlog::warn(get_failed_message("unexpected error", exception.what()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void Streams::Create(const std::string &stream_name, StreamInfo info) {
|
||||
auto locked_streams = streams_.Lock();
|
||||
auto it = CreateConsumer(*locked_streams, stream_name, std::move(info));
|
||||
|
||||
try {
|
||||
Persist(
|
||||
CreateStatus(stream_name, it->second.transformation_name, it->second.owner, *it->second.consumer->ReadLock()));
|
||||
} catch (...) {
|
||||
locked_streams->erase(it);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
void Streams::Drop(const std::string &stream_name) {
|
||||
auto locked_streams = streams_.Lock();
|
||||
|
||||
auto it = GetStream(*locked_streams, stream_name);
|
||||
|
||||
// streams_ is write locked, which means there is no access to it outside of this function, thus only the Test
|
||||
// function can be executing with the consumer, nothing else.
|
||||
// By acquiring the write lock here for the consumer, we make sure there is
|
||||
// no running Test function for this consumer, therefore it can be erased.
|
||||
it->second.consumer->Lock();
|
||||
locked_streams->erase(it);
|
||||
|
||||
if (!storage_.Delete(stream_name)) {
|
||||
throw StreamsException("Couldn't delete stream '{}' from persistent store!", stream_name);
|
||||
}
|
||||
|
||||
// TODO(antaljanosbenjamin) Release the transformation
|
||||
}
|
||||
|
||||
void Streams::Start(const std::string &stream_name) {
|
||||
auto locked_streams = streams_.Lock();
|
||||
auto it = GetStream(*locked_streams, stream_name);
|
||||
|
||||
auto locked_consumer = it->second.consumer->Lock();
|
||||
locked_consumer->Start();
|
||||
|
||||
Persist(CreateStatus(stream_name, it->second.transformation_name, it->second.owner, *locked_consumer));
|
||||
}
|
||||
|
||||
void Streams::Stop(const std::string &stream_name) {
|
||||
auto locked_streams = streams_.Lock();
|
||||
auto it = GetStream(*locked_streams, stream_name);
|
||||
|
||||
auto locked_consumer = it->second.consumer->Lock();
|
||||
locked_consumer->Stop();
|
||||
|
||||
Persist(CreateStatus(stream_name, it->second.transformation_name, it->second.owner, *locked_consumer));
|
||||
}
|
||||
|
||||
void Streams::StartAll() {
|
||||
for (auto locked_streams = streams_.Lock(); auto &[stream_name, stream_data] : *locked_streams) {
|
||||
auto locked_consumer = stream_data.consumer->Lock();
|
||||
if (!locked_consumer->IsRunning()) {
|
||||
locked_consumer->Start();
|
||||
Persist(CreateStatus(stream_name, stream_data.transformation_name, stream_data.owner, *locked_consumer));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void Streams::StopAll() {
|
||||
for (auto locked_streams = streams_.Lock(); auto &[stream_name, stream_data] : *locked_streams) {
|
||||
auto locked_consumer = stream_data.consumer->Lock();
|
||||
if (locked_consumer->IsRunning()) {
|
||||
locked_consumer->Stop();
|
||||
Persist(CreateStatus(stream_name, stream_data.transformation_name, stream_data.owner, *locked_consumer));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<StreamStatus> Streams::GetStreamInfo() const {
|
||||
std::vector<StreamStatus> result;
|
||||
{
|
||||
for (auto locked_streams = streams_.ReadLock(); const auto &[stream_name, stream_data] : *locked_streams) {
|
||||
result.emplace_back(CreateStatus(stream_name, stream_data.transformation_name, stream_data.owner,
|
||||
*stream_data.consumer->ReadLock()));
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
TransformationResult Streams::Check(const std::string &stream_name, std::optional<std::chrono::milliseconds> timeout,
|
||||
std::optional<int64_t> batch_limit) const {
|
||||
// This depends on the fact that Drop will first acquire a write lock to the consumer, and erase it only after that
|
||||
auto [locked_consumer,
|
||||
transformation_name] = [this, &stream_name]() -> std::pair<SynchronizedConsumer::ReadLockedPtr, std::string> {
|
||||
auto locked_streams = streams_.ReadLock();
|
||||
auto it = GetStream(*locked_streams, stream_name);
|
||||
return {it->second.consumer->ReadLock(), it->second.transformation_name};
|
||||
}();
|
||||
|
||||
auto *memory_resource = utils::NewDeleteResource();
|
||||
mgp_result result{nullptr, memory_resource};
|
||||
TransformationResult test_result;
|
||||
|
||||
auto consumer_function = [interpreter_context = interpreter_context_, memory_resource, &stream_name,
|
||||
&transformation_name = transformation_name, &result,
|
||||
&test_result](const std::vector<Message> &messages) mutable {
|
||||
auto accessor = interpreter_context->db->Access();
|
||||
CallCustomTransformation(transformation_name, messages, result, accessor, *memory_resource, stream_name);
|
||||
|
||||
for (auto &row : result.rows) {
|
||||
auto [query, parameters] = ExtractTransformationResult(std::move(row.values), transformation_name, stream_name);
|
||||
std::vector<TypedValue> result_row;
|
||||
result_row.reserve(kExpectedTransformationResultSize);
|
||||
result_row.push_back(std::move(query));
|
||||
result_row.push_back(std::move(parameters));
|
||||
|
||||
test_result.push_back(std::move(result_row));
|
||||
}
|
||||
};
|
||||
|
||||
locked_consumer->Check(timeout, batch_limit, consumer_function);
|
||||
|
||||
return test_result;
|
||||
}
|
||||
|
||||
StreamStatus Streams::CreateStatus(const std::string &name, const std::string &transformation_name,
|
||||
const std::optional<std::string> &owner,
|
||||
const integrations::kafka::Consumer &consumer) {
|
||||
const auto &info = consumer.Info();
|
||||
return StreamStatus{name,
|
||||
StreamInfo{
|
||||
info.topics,
|
||||
info.consumer_group,
|
||||
info.batch_interval,
|
||||
info.batch_size,
|
||||
transformation_name,
|
||||
owner,
|
||||
},
|
||||
consumer.IsRunning()};
|
||||
}
|
||||
|
||||
Streams::StreamsMap::iterator Streams::CreateConsumer(StreamsMap &map, const std::string &stream_name,
|
||||
StreamInfo stream_info) {
|
||||
if (map.contains(stream_name)) {
|
||||
throw StreamsException{"Stream already exists with name '{}'", stream_name};
|
||||
}
|
||||
|
||||
auto *memory_resource = utils::NewDeleteResource();
|
||||
|
||||
auto consumer_function = [interpreter_context = interpreter_context_, memory_resource, stream_name,
|
||||
transformation_name = stream_info.transformation_name, owner = stream_info.owner,
|
||||
interpreter = std::make_shared<Interpreter>(interpreter_context_),
|
||||
result = mgp_result{nullptr, memory_resource}](
|
||||
const std::vector<integrations::kafka::Message> &messages) mutable {
|
||||
auto accessor = interpreter_context->db->Access();
|
||||
EventCounter::IncrementCounter(EventCounter::MessagesConsumed, messages.size());
|
||||
CallCustomTransformation(transformation_name, messages, result, accessor, *memory_resource, stream_name);
|
||||
|
||||
DiscardValueResultStream stream;
|
||||
|
||||
spdlog::trace("Start transaction in stream '{}'", stream_name);
|
||||
utils::OnScopeExit cleanup{[&interpreter, &result]() {
|
||||
result.rows.clear();
|
||||
interpreter->Abort();
|
||||
}};
|
||||
interpreter->BeginTransaction();
|
||||
|
||||
for (auto &row : result.rows) {
|
||||
spdlog::trace("Processing row in stream '{}'", stream_name);
|
||||
auto [query_value, params_value] =
|
||||
ExtractTransformationResult(std::move(row.values), transformation_name, stream_name);
|
||||
storage::PropertyValue params_prop{params_value};
|
||||
|
||||
std::string query{query_value.ValueString()};
|
||||
spdlog::trace("Executing query '{}' in stream '{}'", query, stream_name);
|
||||
auto prepare_result =
|
||||
interpreter->Prepare(query, params_prop.IsNull() ? empty_parameters : params_prop.ValueMap(), nullptr);
|
||||
if (!interpreter_context->auth_checker->IsUserAuthorized(owner, prepare_result.privileges)) {
|
||||
throw StreamsException{
|
||||
"Couldn't execute query '{}' for stream '{}' becuase the owner is not authorized to execute the "
|
||||
"query!",
|
||||
query, stream_name};
|
||||
}
|
||||
interpreter->PullAll(&stream);
|
||||
}
|
||||
|
||||
spdlog::trace("Commit transaction in stream '{}'", stream_name);
|
||||
interpreter->CommitTransaction();
|
||||
result.rows.clear();
|
||||
};
|
||||
|
||||
ConsumerInfo consumer_info{
|
||||
.consumer_name = stream_name,
|
||||
.topics = std::move(stream_info.topics),
|
||||
.consumer_group = std::move(stream_info.consumer_group),
|
||||
.batch_interval = stream_info.batch_interval,
|
||||
.batch_size = stream_info.batch_size,
|
||||
};
|
||||
|
||||
auto bootstrap_servers =
|
||||
stream_info.bootstrap_servers.empty() ? bootstrap_servers_ : std::move(stream_info.bootstrap_servers);
|
||||
auto insert_result = map.insert_or_assign(
|
||||
stream_name,
|
||||
StreamData{std::move(stream_info.transformation_name), std::move(stream_info.owner),
|
||||
std::make_unique<SynchronizedConsumer>(std::move(bootstrap_servers), std::move(consumer_info),
|
||||
std::move(consumer_function))});
|
||||
MG_ASSERT(insert_result.second, "Unexpected error during storing consumer '{}'", stream_name);
|
||||
return insert_result.first;
|
||||
}
|
||||
|
||||
void Streams::Persist(StreamStatus &&status) {
|
||||
const std::string stream_name = status.name;
|
||||
if (!storage_.Put(stream_name, nlohmann::json(std::move(status)).dump())) {
|
||||
throw StreamsException{"Couldn't persist steam data for stream '{}'", stream_name};
|
||||
}
|
||||
}
|
||||
|
||||
std::string_view Streams::BootstrapServers() const { return bootstrap_servers_; }
|
||||
|
||||
utils::BasicResult<std::string> Streams::SetStreamOffset(const std::string_view stream_name, int64_t offset) {
|
||||
auto lock_ptr = streams_.Lock();
|
||||
auto it = GetStream(*lock_ptr, std::string(stream_name));
|
||||
auto consumer_lock_ptr = it->second.consumer->Lock();
|
||||
return consumer_lock_ptr->SetConsumerOffsets(offset);
|
||||
}
|
||||
|
||||
} // namespace query
|
@ -353,20 +353,20 @@ def test_bootstrap_server_empty(
|
||||
|
||||
|
||||
@pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK)
|
||||
def test_set_offset(producer, topics, connection, transformation):
|
||||
assert len(topics) > 0
|
||||
def test_set_offset(kafka_producer, kafka_topics, connection, transformation):
|
||||
assert len(kafka_topics) > 0
|
||||
cursor = connection.cursor()
|
||||
common.execute_and_fetch_all(
|
||||
cursor,
|
||||
"CREATE KAFKA STREAM test "
|
||||
f"TOPICS {topics[0]} "
|
||||
f"TOPICS {kafka_topics[0]} "
|
||||
f"TRANSFORM {transformation} "
|
||||
"BATCH_SIZE 1",
|
||||
)
|
||||
|
||||
messages = [f"{i} message" for i in range(1, 21)]
|
||||
for message in messages:
|
||||
producer.send(topics[0], message.encode()).get(timeout=60)
|
||||
kafka_producer.send(kafka_topics[0], message.encode()).get(timeout=60)
|
||||
|
||||
def consume(expected_msgs):
|
||||
common.start_stream(cursor, "test")
|
||||
@ -415,7 +415,7 @@ def test_set_offset(producer, topics, connection, transformation):
|
||||
res = execute_set_offset_and_consume(-2, [])
|
||||
assert len(res) == 0
|
||||
last_msg = "Final Message"
|
||||
producer.send(topics[0], last_msg.encode()).get(timeout=60)
|
||||
kafka_producer.send(kafka_topics[0], last_msg.encode()).get(timeout=60)
|
||||
res = consume([last_msg])
|
||||
assert len(res) == 1
|
||||
assert comparison_check("Final Message", res[0])
|
||||
|
@ -1,552 +0,0 @@
|
||||
#!/usr/bin/python3
|
||||
|
||||
# Copyright 2021 Memgraph Ltd.
|
||||
#
|
||||
# Use of this software is governed by the Business Source License
|
||||
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
# License, and you may not use this file except in compliance with the Business Source License.
|
||||
#
|
||||
# As of the Change Date specified in that file, in accordance with
|
||||
# the Business Source License, use of this software will be governed
|
||||
# by the Apache License, Version 2.0, included in the file
|
||||
# licenses/APL.txt.
|
||||
|
||||
import sys
|
||||
import pytest
|
||||
import mgclient
|
||||
import time
|
||||
from multiprocessing import Process, Value
|
||||
import common
|
||||
|
||||
# These are the indices of the query and parameters in the result of CHECK
|
||||
# STREAM query
|
||||
QUERY = 0
|
||||
PARAMS = 1
|
||||
|
||||
TRANSFORMATIONS_TO_CHECK = ["transform.simple", "transform.with_parameters"]
|
||||
|
||||
SIMPLE_MSG = b"message"
|
||||
|
||||
|
||||
@pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK)
|
||||
def test_simple(producer, topics, connection, transformation):
|
||||
assert len(topics) > 0
|
||||
cursor = connection.cursor()
|
||||
common.execute_and_fetch_all(
|
||||
cursor,
|
||||
"CREATE KAFKA STREAM test "
|
||||
f"TOPICS {','.join(topics)} "
|
||||
f"TRANSFORM {transformation}",
|
||||
)
|
||||
common.start_stream(cursor, "test")
|
||||
time.sleep(5)
|
||||
|
||||
for topic in topics:
|
||||
producer.send(topic, SIMPLE_MSG).get(timeout=60)
|
||||
|
||||
for topic in topics:
|
||||
common.check_vertex_exists_with_topic_and_payload(
|
||||
cursor, topic, SIMPLE_MSG
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK)
|
||||
def test_separate_consumers(producer, topics, connection, transformation):
|
||||
assert len(topics) > 0
|
||||
cursor = connection.cursor()
|
||||
|
||||
stream_names = []
|
||||
for topic in topics:
|
||||
stream_name = "stream_" + topic
|
||||
stream_names.append(stream_name)
|
||||
common.execute_and_fetch_all(
|
||||
cursor,
|
||||
f"CREATE KAFKA STREAM {stream_name} "
|
||||
f"TOPICS {topic} "
|
||||
f"TRANSFORM {transformation}",
|
||||
)
|
||||
|
||||
for stream_name in stream_names:
|
||||
common.start_stream(cursor, stream_name)
|
||||
|
||||
time.sleep(5)
|
||||
|
||||
for topic in topics:
|
||||
producer.send(topic, SIMPLE_MSG).get(timeout=60)
|
||||
|
||||
for topic in topics:
|
||||
common.check_vertex_exists_with_topic_and_payload(
|
||||
cursor, topic, SIMPLE_MSG
|
||||
)
|
||||
|
||||
|
||||
def test_start_from_last_committed_offset(producer, topics, connection):
|
||||
# This test creates a stream, consumes a message to have a committed
|
||||
# offset, then destroys the stream. A new message is sent before the
|
||||
# stream is recreated and then restarted. This simulates when Memgraph is
|
||||
# stopped (stream is destroyed) and then restarted (stream is recreated).
|
||||
# This is of course not as good as restarting memgraph would be, but
|
||||
# restarting Memgraph during a single workload cannot be done currently.
|
||||
assert len(topics) > 0
|
||||
cursor = connection.cursor()
|
||||
common.execute_and_fetch_all(
|
||||
cursor,
|
||||
"CREATE KAFKA STREAM test "
|
||||
f"TOPICS {topics[0]} "
|
||||
"TRANSFORM kafka_transform.simple",
|
||||
)
|
||||
common.start_stream(cursor, "test")
|
||||
time.sleep(1)
|
||||
|
||||
producer.send(topics[0], SIMPLE_MSG).get(timeout=60)
|
||||
|
||||
common.check_vertex_exists_with_topic_and_payload(
|
||||
cursor, topics[0], SIMPLE_MSG
|
||||
)
|
||||
|
||||
common.stop_stream(cursor, "test")
|
||||
common.drop_stream(cursor, "test")
|
||||
|
||||
messages = [b"second message", b"third message"]
|
||||
for message in messages:
|
||||
producer.send(topics[0], message).get(timeout=60)
|
||||
|
||||
for message in messages:
|
||||
vertices_with_msg = common.execute_and_fetch_all(
|
||||
cursor,
|
||||
"MATCH (n: MESSAGE {"
|
||||
f"payload: '{message.decode('utf-8')}'"
|
||||
"}) RETURN n",
|
||||
)
|
||||
|
||||
assert len(vertices_with_msg) == 0
|
||||
|
||||
common.execute_and_fetch_all(
|
||||
cursor,
|
||||
"CREATE KAFKA STREAM test "
|
||||
f"TOPICS {topics[0]} "
|
||||
"TRANSFORM kafka_transform.simple",
|
||||
)
|
||||
common.start_stream(cursor, "test")
|
||||
|
||||
for message in messages:
|
||||
common.check_vertex_exists_with_topic_and_payload(
|
||||
cursor, topics[0], message
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK)
|
||||
def test_check_stream(producer, topics, connection, transformation):
|
||||
assert len(topics) > 0
|
||||
cursor = connection.cursor()
|
||||
common.execute_and_fetch_all(
|
||||
cursor,
|
||||
"CREATE KAFKA STREAM test "
|
||||
f"TOPICS {topics[0]} "
|
||||
f"TRANSFORM {transformation} "
|
||||
"BATCH_SIZE 1",
|
||||
)
|
||||
common.start_stream(cursor, "test")
|
||||
time.sleep(1)
|
||||
|
||||
producer.send(topics[0], SIMPLE_MSG).get(timeout=60)
|
||||
common.stop_stream(cursor, "test")
|
||||
|
||||
messages = [b"first message", b"second message", b"third message"]
|
||||
for message in messages:
|
||||
producer.send(topics[0], message).get(timeout=60)
|
||||
|
||||
def check_check_stream(batch_limit):
|
||||
assert (
|
||||
transformation == "transform.simple"
|
||||
or transformation == "transform.with_parameters"
|
||||
)
|
||||
test_results = common.execute_and_fetch_all(
|
||||
cursor, f"CHECK STREAM test BATCH_LIMIT {batch_limit}"
|
||||
)
|
||||
assert len(test_results) == batch_limit
|
||||
|
||||
for i in range(batch_limit):
|
||||
message_as_str = messages[i].decode("utf-8")
|
||||
if transformation == "transform.simple":
|
||||
assert f"payload: '{message_as_str}'" in test_results[i][QUERY]
|
||||
assert test_results[i][PARAMS] is None
|
||||
else:
|
||||
assert test_results[i][QUERY] == (
|
||||
"CREATE (n:MESSAGE "
|
||||
"{timestamp: $timestamp, "
|
||||
"payload: $payload, "
|
||||
"topic: $topic, "
|
||||
"offset: $offset})"
|
||||
)
|
||||
parameters = test_results[i][PARAMS]
|
||||
# this is not a very sofisticated test, but checks if
|
||||
# timestamp has some kind of value
|
||||
assert parameters["timestamp"] > 1000000000000
|
||||
assert parameters["topic"] == topics[0]
|
||||
assert parameters["payload"] == message_as_str
|
||||
|
||||
check_check_stream(1)
|
||||
check_check_stream(2)
|
||||
check_check_stream(3)
|
||||
common.start_stream(cursor, "test")
|
||||
|
||||
for message in messages:
|
||||
common.check_vertex_exists_with_topic_and_payload(
|
||||
cursor, topics[0], message
|
||||
)
|
||||
|
||||
|
||||
def test_show_streams(producer, topics, connection):
|
||||
assert len(topics) > 1
|
||||
cursor = connection.cursor()
|
||||
common.execute_and_fetch_all(
|
||||
cursor,
|
||||
"CREATE KAFKA STREAM default_values "
|
||||
f"TOPICS {topics[0]} "
|
||||
f"TRANSFORM transform.simple "
|
||||
f"BOOTSTRAP_SERVERS 'localhost:9092'",
|
||||
)
|
||||
|
||||
consumer_group = "my_special_consumer_group"
|
||||
batch_interval = 42
|
||||
batch_size = 3
|
||||
common.execute_and_fetch_all(
|
||||
cursor,
|
||||
"CREATE KAFKA STREAM complex_values "
|
||||
f"TOPICS {','.join(topics)} "
|
||||
f"TRANSFORM transform.with_parameters "
|
||||
f"CONSUMER_GROUP {consumer_group} "
|
||||
f"BATCH_INTERVAL {batch_interval} "
|
||||
f"BATCH_SIZE {batch_size} ",
|
||||
)
|
||||
|
||||
assert len(common.execute_and_fetch_all(cursor, "SHOW STREAMS")) == 2
|
||||
|
||||
common.check_stream_info(
|
||||
cursor,
|
||||
"default_values",
|
||||
("default_values", None, None, "transform.simple", None, False),
|
||||
)
|
||||
|
||||
common.check_stream_info(
|
||||
cursor,
|
||||
"complex_values",
|
||||
(
|
||||
"complex_values",
|
||||
batch_interval,
|
||||
batch_size,
|
||||
"transform.with_parameters",
|
||||
None,
|
||||
False,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("operation", ["START", "STOP"])
|
||||
def test_start_and_stop_during_check(producer, topics, connection, operation):
|
||||
# This test is quite complex. The goal is to call START/STOP queries
|
||||
# while a CHECK query is waiting for its result. Because the Global
|
||||
# Interpreter Lock, running queries on multiple threads is not useful,
|
||||
# because only one of them can call Cursor::execute at a time. Therefore
|
||||
# multiple processes are used to execute the queries, because different
|
||||
# processes have different GILs.
|
||||
# The counter variables are thread- and process-safe variables to
|
||||
# synchronize between the different processes. Each value represents a
|
||||
# specific phase of the execution of the processes.
|
||||
assert len(topics) > 1
|
||||
assert operation == "START" or operation == "STOP"
|
||||
cursor = connection.cursor()
|
||||
common.execute_and_fetch_all(
|
||||
cursor,
|
||||
"CREATE KAFKA STREAM test_stream "
|
||||
f"TOPICS {topics[0]} "
|
||||
f"TRANSFORM transform.simple",
|
||||
)
|
||||
|
||||
check_counter = Value("i", 0)
|
||||
check_result_len = Value("i", 0)
|
||||
operation_counter = Value("i", 0)
|
||||
|
||||
CHECK_BEFORE_EXECUTE = 1
|
||||
CHECK_AFTER_FETCHALL = 2
|
||||
CHECK_CORRECT_RESULT = 3
|
||||
CHECK_INCORRECT_RESULT = 4
|
||||
|
||||
def call_check(counter, result_len):
|
||||
# This process will call the CHECK query and increment the counter
|
||||
# based on its progress and expected behavior
|
||||
connection = common.connect()
|
||||
cursor = connection.cursor()
|
||||
counter.value = CHECK_BEFORE_EXECUTE
|
||||
result = common.execute_and_fetch_all(
|
||||
cursor, "CHECK STREAM test_stream"
|
||||
)
|
||||
result_len.value = len(result)
|
||||
counter.value = CHECK_AFTER_FETCHALL
|
||||
if len(result) > 0 and "payload: 'message'" in result[0][QUERY]:
|
||||
counter.value = CHECK_CORRECT_RESULT
|
||||
else:
|
||||
counter.value = CHECK_INCORRECT_RESULT
|
||||
|
||||
OP_BEFORE_EXECUTE = 1
|
||||
OP_AFTER_FETCHALL = 2
|
||||
OP_ALREADY_STOPPED_EXCEPTION = 3
|
||||
OP_INCORRECT_ALREADY_STOPPED_EXCEPTION = 4
|
||||
OP_UNEXPECTED_EXCEPTION = 5
|
||||
|
||||
def call_operation(counter):
|
||||
# This porcess will call the query with the specified operation and
|
||||
# increment the counter based on its progress and expected behavior
|
||||
connection = common.connect()
|
||||
cursor = connection.cursor()
|
||||
counter.value = OP_BEFORE_EXECUTE
|
||||
try:
|
||||
common.execute_and_fetch_all(
|
||||
cursor, f"{operation} STREAM test_stream"
|
||||
)
|
||||
counter.value = OP_AFTER_FETCHALL
|
||||
except mgclient.DatabaseError as e:
|
||||
if "Kafka consumer test_stream is already stopped" in str(e):
|
||||
counter.value = OP_ALREADY_STOPPED_EXCEPTION
|
||||
else:
|
||||
counter.value = OP_INCORRECT_ALREADY_STOPPED_EXCEPTION
|
||||
except Exception:
|
||||
counter.value = OP_UNEXPECTED_EXCEPTION
|
||||
|
||||
check_stream_proc = Process(
|
||||
target=call_check, daemon=True, args=(check_counter, check_result_len)
|
||||
)
|
||||
operation_proc = Process(
|
||||
target=call_operation, daemon=True, args=(operation_counter,)
|
||||
)
|
||||
|
||||
try:
|
||||
check_stream_proc.start()
|
||||
|
||||
time.sleep(0.5)
|
||||
|
||||
assert common.timed_wait(
|
||||
lambda: check_counter.value == CHECK_BEFORE_EXECUTE
|
||||
)
|
||||
assert common.timed_wait(
|
||||
lambda: common.get_is_running(cursor, "test_stream")
|
||||
)
|
||||
assert check_counter.value == CHECK_BEFORE_EXECUTE, (
|
||||
"SHOW STREAMS " "was blocked until the end of CHECK STREAM"
|
||||
)
|
||||
operation_proc.start()
|
||||
assert common.timed_wait(
|
||||
lambda: operation_counter.value == OP_BEFORE_EXECUTE
|
||||
)
|
||||
|
||||
producer.send(topics[0], SIMPLE_MSG).get(timeout=60)
|
||||
assert common.timed_wait(
|
||||
lambda: check_counter.value > CHECK_AFTER_FETCHALL
|
||||
)
|
||||
assert check_counter.value == CHECK_CORRECT_RESULT
|
||||
assert check_result_len.value == 1
|
||||
check_stream_proc.join()
|
||||
|
||||
operation_proc.join()
|
||||
if operation == "START":
|
||||
assert operation_counter.value == OP_AFTER_FETCHALL
|
||||
assert common.get_is_running(cursor, "test_stream")
|
||||
else:
|
||||
assert operation_counter.value == OP_ALREADY_STOPPED_EXCEPTION
|
||||
assert not common.get_is_running(cursor, "test_stream")
|
||||
|
||||
finally:
|
||||
# to make sure CHECK STREAM finishes
|
||||
producer.send(topics[0], SIMPLE_MSG).get(timeout=60)
|
||||
if check_stream_proc.is_alive():
|
||||
check_stream_proc.terminate()
|
||||
if operation_proc.is_alive():
|
||||
operation_proc.terminate()
|
||||
|
||||
|
||||
def test_check_already_started_stream(topics, connection):
|
||||
assert len(topics) > 0
|
||||
cursor = connection.cursor()
|
||||
|
||||
common.execute_and_fetch_all(
|
||||
cursor,
|
||||
"CREATE KAFKA STREAM started_stream "
|
||||
f"TOPICS {topics[0]} "
|
||||
f"TRANSFORM transform.simple",
|
||||
)
|
||||
common.start_stream(cursor, "started_stream")
|
||||
|
||||
with pytest.raises(mgclient.DatabaseError):
|
||||
common.execute_and_fetch_all(cursor, "CHECK STREAM started_stream")
|
||||
|
||||
|
||||
def test_start_checked_stream_after_timeout(topics, connection):
|
||||
cursor = connection.cursor()
|
||||
common.execute_and_fetch_all(
|
||||
cursor,
|
||||
"CREATE KAFKA STREAM test_stream "
|
||||
f"TOPICS {topics[0]} "
|
||||
f"TRANSFORM transform.simple",
|
||||
)
|
||||
|
||||
timeout_ms = 2000
|
||||
|
||||
def call_check():
|
||||
common.execute_and_fetch_all(
|
||||
common.connect().cursor(),
|
||||
f"CHECK STREAM test_stream TIMEOUT {timeout_ms}",
|
||||
)
|
||||
|
||||
check_stream_proc = Process(target=call_check, daemon=True)
|
||||
|
||||
start = time.time()
|
||||
check_stream_proc.start()
|
||||
assert common.timed_wait(
|
||||
lambda: common.get_is_running(cursor, "test_stream")
|
||||
)
|
||||
common.start_stream(cursor, "test_stream")
|
||||
end = time.time()
|
||||
|
||||
assert (
|
||||
end - start
|
||||
) < 1.3 * timeout_ms, "The START STREAM was blocked too long"
|
||||
assert common.get_is_running(cursor, "test_stream")
|
||||
common.stop_stream(cursor, "test_stream")
|
||||
|
||||
|
||||
def test_restart_after_error(producer, topics, connection):
|
||||
cursor = connection.cursor()
|
||||
common.execute_and_fetch_all(
|
||||
cursor,
|
||||
"CREATE KAFKA STREAM test_stream "
|
||||
f"TOPICS {topics[0]} "
|
||||
f"TRANSFORM transform.query",
|
||||
)
|
||||
|
||||
common.start_stream(cursor, "test_stream")
|
||||
time.sleep(1)
|
||||
|
||||
producer.send(topics[0], SIMPLE_MSG).get(timeout=60)
|
||||
assert common.timed_wait(
|
||||
lambda: not common.get_is_running(cursor, "test_stream")
|
||||
)
|
||||
|
||||
common.start_stream(cursor, "test_stream")
|
||||
time.sleep(1)
|
||||
producer.send(topics[0], b"CREATE (n:VERTEX { id : 42 })")
|
||||
assert common.check_one_result_row(
|
||||
cursor, "MATCH (n:VERTEX { id : 42 }) RETURN n"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK)
|
||||
def test_bootstrap_server(producer, topics, connection, transformation):
|
||||
assert len(topics) > 0
|
||||
cursor = connection.cursor()
|
||||
local = "localhost:9092"
|
||||
common.execute_and_fetch_all(
|
||||
cursor,
|
||||
"CREATE KAFKA STREAM test "
|
||||
f"TOPICS {','.join(topics)} "
|
||||
f"TRANSFORM {transformation} "
|
||||
f"BOOTSTRAP_SERVERS '{local}'",
|
||||
)
|
||||
common.start_stream(cursor, "test")
|
||||
time.sleep(5)
|
||||
|
||||
for topic in topics:
|
||||
producer.send(topic, SIMPLE_MSG).get(timeout=60)
|
||||
|
||||
for topic in topics:
|
||||
common.check_vertex_exists_with_topic_and_payload(
|
||||
cursor, topic, SIMPLE_MSG
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK)
|
||||
def test_bootstrap_server_empty(producer, topics, connection, transformation):
|
||||
assert len(topics) > 0
|
||||
cursor = connection.cursor()
|
||||
with pytest.raises(mgclient.DatabaseError):
|
||||
common.execute_and_fetch_all(
|
||||
cursor,
|
||||
"CREATE KAFKA STREAM test "
|
||||
f"TOPICS {','.join(topics)} "
|
||||
f"TRANSFORM {transformation} "
|
||||
"BOOTSTRAP_SERVERS ''",
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK)
|
||||
def test_set_offset(producer, topics, connection, transformation):
|
||||
assert len(topics) > 0
|
||||
cursor = connection.cursor()
|
||||
common.execute_and_fetch_all(
|
||||
cursor,
|
||||
"CREATE STREAM test "
|
||||
f"TOPICS {topics[0]} "
|
||||
f"TRANSFORM {transformation} "
|
||||
"BATCH_SIZE 1",
|
||||
)
|
||||
|
||||
messages = [f"{i} message" for i in range(1, 21)]
|
||||
for message in messages:
|
||||
producer.send(topics[0], message.encode()).get(timeout=60)
|
||||
|
||||
def consume(expected_msgs):
|
||||
common.start_stream(cursor, "test")
|
||||
if len(expected_msgs) == 0:
|
||||
time.sleep(2)
|
||||
else:
|
||||
assert common.check_one_result_row(
|
||||
cursor,
|
||||
(
|
||||
f"MATCH (n: MESSAGE {{payload: '{expected_msgs[-1]}'}})"
|
||||
"RETURN n"
|
||||
),
|
||||
)
|
||||
common.stop_stream(cursor, "test")
|
||||
res = common.execute_and_fetch_all(
|
||||
cursor, "MATCH (n) RETURN n.payload"
|
||||
)
|
||||
return res
|
||||
|
||||
def execute_set_offset_and_consume(id, expected_msgs):
|
||||
common.execute_and_fetch_all(
|
||||
cursor, f"CALL mg.kafka_set_stream_offset('test', {id})"
|
||||
)
|
||||
return consume(expected_msgs)
|
||||
|
||||
with pytest.raises(mgclient.DatabaseError):
|
||||
res = common.execute_and_fetch_all(
|
||||
cursor, "CALL mg.kafka_set_stream_offset('foo', 10)"
|
||||
)
|
||||
|
||||
def comparison_check(a, b):
|
||||
return a == str(b).strip("'(,)")
|
||||
|
||||
res = execute_set_offset_and_consume(10, messages[10:])
|
||||
assert len(res) == 10
|
||||
assert all([comparison_check(a, b) for a, b in zip(messages[10:], res)])
|
||||
common.execute_and_fetch_all(cursor, "MATCH (n) DETACH DELETE n")
|
||||
|
||||
res = execute_set_offset_and_consume(-1, messages)
|
||||
assert len(res) == len(messages)
|
||||
assert all([comparison_check(a, b) for a, b in zip(messages, res)])
|
||||
res = common.execute_and_fetch_all(cursor, "MATCH (n) return n.offset")
|
||||
assert all([comparison_check(str(i), res[i]) for i in range(1, 20)])
|
||||
res = common.execute_and_fetch_all(cursor, "MATCH (n) DETACH DELETE n")
|
||||
|
||||
res = execute_set_offset_and_consume(-2, [])
|
||||
assert len(res) == 0
|
||||
last_msg = "Final Message"
|
||||
producer.send(topics[0], last_msg.encode()).get(timeout=60)
|
||||
res = consume([last_msg])
|
||||
assert len(res) == 1
|
||||
assert comparison_check("Final Message", res[0])
|
||||
common.execute_and_fetch_all(cursor, "MATCH (n) DETACH DELETE n")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(pytest.main([__file__, "-rA"]))
|
@ -29,7 +29,7 @@ def simple(
|
||||
CREATE (n:MESSAGE {{
|
||||
timestamp: '{message.timestamp()}',
|
||||
payload: '{payload_as_str}',
|
||||
offset: {message.offset()}
|
||||
offset: '{message.offset()}',
|
||||
topic: '{message.topic_name()}'
|
||||
}})""",
|
||||
parameters=None))
|
||||
|
Loading…
Reference in New Issue
Block a user