Merge branch 'master' into add-kill-to-interactive-runner

This commit is contained in:
János Benjamin Antal 2022-06-21 08:53:10 +02:00 committed by GitHub
commit 9fca21f508
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 1348 additions and 234 deletions

View File

@ -17,6 +17,7 @@
namespace memgraph::integrations {
inline constexpr int64_t kDefaultCheckBatchLimit{1};
inline constexpr int64_t kMinimumStartBatchLimit{1};
inline constexpr std::chrono::milliseconds kDefaultCheckTimeout{30000};
inline constexpr std::chrono::milliseconds kMinimumInterval{1};
inline constexpr int64_t kMinimumSize{1};

View File

@ -74,6 +74,36 @@ utils::BasicResult<std::string, std::vector<Message>> GetBatch(RdKafka::KafkaCon
return std::move(batch);
}
void CheckAndDestroyLastAssignmentIfNeeded(RdKafka::KafkaConsumer &consumer, const ConsumerInfo &info,
std::vector<RdKafka::TopicPartition *> &last_assignment) {
if (!last_assignment.empty()) {
if (const auto err = consumer.assign(last_assignment); err != RdKafka::ERR_NO_ERROR) {
throw ConsumerStartFailedException(info.consumer_name,
fmt::format("Couldn't restore commited offsets: '{}'", RdKafka::err2str(err)));
}
RdKafka::TopicPartition::destroy(last_assignment);
}
}
void TryToConsumeBatch(RdKafka::KafkaConsumer &consumer, const ConsumerInfo &info,
const ConsumerFunction &consumer_function, const std::vector<Message> &batch) {
consumer_function(batch);
std::vector<RdKafka::TopicPartition *> partitions;
utils::OnScopeExit clear_partitions([&]() { RdKafka::TopicPartition::destroy(partitions); });
if (const auto err = consumer.assignment(partitions); err != RdKafka::ERR_NO_ERROR) {
throw ConsumerCommitFailedException(
info.consumer_name, fmt::format("Couldn't get assignment to commit offsets: {}", RdKafka::err2str(err)));
}
if (const auto err = consumer.position(partitions); err != RdKafka::ERR_NO_ERROR) {
throw ConsumerCommitFailedException(info.consumer_name,
fmt::format("Couldn't get offsets from librdkafka {}", RdKafka::err2str(err)));
}
if (const auto err = consumer.commitSync(partitions); err != RdKafka::ERR_NO_ERROR) {
throw ConsumerCommitFailedException(info.consumer_name, RdKafka::err2str(err));
}
}
} // namespace
Message::Message(std::unique_ptr<RdKafka::Message> &&message) : message_{std::move(message)} {
@ -221,10 +251,21 @@ void Consumer::Start() {
StartConsuming();
}
void Consumer::StartIfStopped() {
if (!is_running_) {
StartConsuming();
void Consumer::StartWithLimit(const uint64_t limit_batches, std::optional<std::chrono::milliseconds> timeout) const {
if (is_running_) {
throw ConsumerRunningException(info_.consumer_name);
}
if (limit_batches < kMinimumStartBatchLimit) {
throw ConsumerStartFailedException(
info_.consumer_name, fmt::format("Batch limit has to be greater than or equal to {}", kMinimumStartBatchLimit));
}
if (timeout.value_or(kMinimumInterval) < kMinimumInterval) {
throw ConsumerStartFailedException(
info_.consumer_name,
fmt::format("Timeout has to be greater than or equal to {} milliseconds", kMinimumInterval.count()));
}
StartConsumingWithLimit(limit_batches, timeout);
}
void Consumer::Stop() {
@ -244,7 +285,7 @@ void Consumer::StopIfRunning() {
}
}
void Consumer::Check(std::optional<std::chrono::milliseconds> timeout, std::optional<int64_t> limit_batches,
void Consumer::Check(std::optional<std::chrono::milliseconds> timeout, std::optional<uint64_t> limit_batches,
const ConsumerFunction &check_consumer_function) const {
// NOLINTNEXTLINE (modernize-use-nullptr)
if (timeout.value_or(kMinimumInterval) < kMinimumInterval) {
@ -344,13 +385,7 @@ void Consumer::StartConsuming() {
is_running_.store(true);
if (!last_assignment_.empty()) {
if (const auto err = consumer_->assign(last_assignment_); err != RdKafka::ERR_NO_ERROR) {
throw ConsumerStartFailedException(info_.consumer_name,
fmt::format("Couldn't restore commited offsets: '{}'", RdKafka::err2str(err)));
}
RdKafka::TopicPartition::destroy(last_assignment_);
}
CheckAndDestroyLastAssignmentIfNeeded(*consumer_, info_, last_assignment_);
thread_ = std::thread([this] {
static constexpr auto kMaxThreadNameSize = utils::GetMaxThreadNameSize();
@ -361,33 +396,18 @@ void Consumer::StartConsuming() {
while (is_running_) {
auto maybe_batch = GetBatch(*consumer_, info_, is_running_);
if (maybe_batch.HasError()) {
spdlog::warn("Error happened in consumer {} while fetching messages: {}!", info_.consumer_name,
maybe_batch.GetError());
break;
throw ConsumerReadMessagesFailedException(info_.consumer_name, maybe_batch.GetError());
}
const auto &batch = maybe_batch.GetValue();
if (batch.empty()) continue;
if (batch.empty()) {
continue;
}
spdlog::info("Kafka consumer {} is processing a batch", info_.consumer_name);
try {
consumer_function_(batch);
std::vector<RdKafka::TopicPartition *> partitions;
utils::OnScopeExit clear_partitions([&]() { RdKafka::TopicPartition::destroy(partitions); });
if (const auto err = consumer_->assignment(partitions); err != RdKafka::ERR_NO_ERROR) {
throw ConsumerCheckFailedException(
info_.consumer_name, fmt::format("Couldn't get assignment to commit offsets: {}", RdKafka::err2str(err)));
}
if (const auto err = consumer_->position(partitions); err != RdKafka::ERR_NO_ERROR) {
throw ConsumerCheckFailedException(
info_.consumer_name, fmt::format("Couldn't get offsets from librdkafka {}", RdKafka::err2str(err)));
}
if (const auto err = consumer_->commitSync(partitions); err != RdKafka::ERR_NO_ERROR) {
spdlog::warn("Committing offset of consumer {} failed: {}", info_.consumer_name, RdKafka::err2str(err));
break;
}
TryToConsumeBatch(*consumer_, info_, consumer_function_, batch);
} catch (const std::exception &e) {
spdlog::warn("Error happened in consumer {} while processing a batch: {}!", info_.consumer_name, e.what());
break;
@ -398,6 +418,44 @@ void Consumer::StartConsuming() {
});
}
void Consumer::StartConsumingWithLimit(uint64_t limit_batches, std::optional<std::chrono::milliseconds> timeout) const {
MG_ASSERT(!is_running_, "Cannot start already running consumer!");
if (is_running_.exchange(true)) {
throw ConsumerRunningException(info_.consumer_name);
}
utils::OnScopeExit restore_is_running([this] { is_running_.store(false); });
CheckAndDestroyLastAssignmentIfNeeded(*consumer_, info_, last_assignment_);
const auto timeout_to_use = timeout.value_or(kDefaultCheckTimeout);
const auto start = std::chrono::steady_clock::now();
for (uint64_t batch_count = 0; batch_count < limit_batches;) {
const auto now = std::chrono::steady_clock::now();
if (now - start >= timeout_to_use) {
throw ConsumerStartFailedException(info_.consumer_name, "Timeout reached");
}
const auto maybe_batch = GetBatch(*consumer_, info_, is_running_);
if (maybe_batch.HasError()) {
throw ConsumerReadMessagesFailedException(info_.consumer_name, maybe_batch.GetError());
}
const auto &batch = maybe_batch.GetValue();
if (batch.empty()) {
continue;
}
++batch_count;
spdlog::info("Kafka consumer {} is processing a batch", info_.consumer_name);
TryToConsumeBatch(*consumer_, info_, consumer_function_, batch);
spdlog::info("Kafka consumer {} finished processing", info_.consumer_name);
}
}
void Consumer::StopConsuming() {
is_running_.store(false);
if (thread_.joinable()) thread_.join();

View File

@ -113,11 +113,19 @@ class Consumer final : public RdKafka::EventCb {
/// This method will start a new thread which will poll all the topics for messages.
///
/// @throws ConsumerRunningException if the consumer is already running
/// @throws ConsumerStartFailedException if the commited offsets cannot be restored
void Start();
/// Starts consuming messages if it is not started already.
/// Starts consuming messages.
///
void StartIfStopped();
/// This method will start a new thread which will poll all the topics for messages.
///
/// @param limit_batches the consumer will only consume the given number of batches.
/// @param timeout the maximum duration during which the command should run.
///
/// @throws ConsumerRunningException if the consumer is already running
/// @throws ConsumerStartFailedException if the commited offsets cannot be restored
void StartWithLimit(uint64_t limit_batches, std::optional<std::chrono::milliseconds> timeout) const;
/// Stops consuming messages.
///
@ -136,9 +144,9 @@ class Consumer final : public RdKafka::EventCb {
/// used.
/// @param check_consumer_function a function to feed the received messages in, only used during this dry-run.
///
/// @throws ConsumerRunningException if the consumer is alredy running.
/// @throws ConsumerRunningException if the consumer is already running.
/// @throws ConsumerCheckFailedException if check isn't successful.
void Check(std::optional<std::chrono::milliseconds> timeout, std::optional<int64_t> limit_batches,
void Check(std::optional<std::chrono::milliseconds> timeout, std::optional<uint64_t> limit_batches,
const ConsumerFunction &check_consumer_function) const;
/// Returns true if the consumer is actively consuming messages.
@ -157,6 +165,7 @@ class Consumer final : public RdKafka::EventCb {
void event_cb(RdKafka::Event &event) override;
void StartConsuming();
void StartConsumingWithLimit(uint64_t limit_batches, std::optional<std::chrono::milliseconds> timeout) const;
void StopConsuming();
@ -178,7 +187,6 @@ class Consumer final : public RdKafka::EventCb {
ConsumerFunction consumer_function_;
mutable std::atomic<bool> is_running_{false};
mutable std::vector<RdKafka::TopicPartition *> last_assignment_; // Protected by is_running_
std::optional<int64_t> limit_batches_{std::nullopt};
std::unique_ptr<RdKafka::KafkaConsumer, std::function<void(RdKafka::KafkaConsumer *)>> consumer_;
std::thread thread_;
ConsumerRebalanceCb cb_;

View File

@ -64,4 +64,16 @@ class TopicNotFoundException : public KafkaStreamException {
TopicNotFoundException(const std::string_view consumer_name, const std::string_view topic_name)
: KafkaStreamException("Kafka consumer {} cannot find topic {}", consumer_name, topic_name) {}
};
class ConsumerCommitFailedException : public KafkaStreamException {
public:
ConsumerCommitFailedException(const std::string_view consumer_name, const std::string_view error)
: KafkaStreamException("Committing offset of consumer {} failed: {}", consumer_name, error) {}
};
class ConsumerReadMessagesFailedException : public KafkaStreamException {
public:
ConsumerReadMessagesFailedException(const std::string_view consumer_name, const std::string_view error)
: KafkaStreamException("Error happened in consumer {} while fetching messages: {}", consumer_name, error) {}
};
} // namespace memgraph::integrations::kafka

View File

@ -11,13 +11,14 @@
#include "integrations/pulsar/consumer.hpp"
#include <algorithm>
#include <chrono>
#include <thread>
#include <fmt/format.h>
#include <pulsar/Client.h>
#include <pulsar/InitialPosition.h>
#include <chrono>
#include <thread>
#include "integrations/constants.hpp"
#include "integrations/pulsar/exceptions.hpp"
#include "utils/concepts.hpp"
@ -33,6 +34,10 @@ namespace {
template <typename T>
concept PulsarConsumer = utils::SameAsAnyOf<T, pulsar_client::Consumer, pulsar_client::Reader>;
template <typename TFunc>
concept PulsarMessageGetter =
std::same_as<const pulsar_client::Message &, std::invoke_result_t<TFunc, const Message &>>;
pulsar_client::Result ConsumeMessage(pulsar_client::Consumer &consumer, pulsar_client::Message &message,
int remaining_timeout_in_ms) {
return consumer.receive(message, remaining_timeout_in_ms);
@ -97,6 +102,26 @@ pulsar_client::Client CreateClient(const std::string &service_url) {
conf.setLogger(new SpdlogLoggerFactory);
return {service_url, conf};
}
template <PulsarConsumer TConsumer, PulsarMessageGetter TPulsarMessageGetter>
void TryToConsumeBatch(TConsumer &consumer, const ConsumerInfo &info, const ConsumerFunction &consumer_function,
pulsar_client::MessageId &last_message_id, const std::vector<Message> &batch,
const TPulsarMessageGetter &message_getter) {
consumer_function(batch);
auto has_message_failed = [&consumer, &info, &last_message_id, &message_getter](const auto &message) {
if (const auto result = consumer.acknowledge(message_getter(message)); result != pulsar_client::ResultOk) {
spdlog::warn("Acknowledging a message of consumer {} failed: {}", info.consumer_name, result);
return true;
}
last_message_id = message_getter(message).getMessageId();
return false;
};
if (std::ranges::any_of(batch, has_message_failed)) {
throw ConsumerAcknowledgeMessagesFailedException(info.consumer_name);
}
}
} // namespace
Message::Message(pulsar_client::Message &&message) : message_{std::move(message)} {}
@ -137,6 +162,24 @@ void Consumer::Start() {
StartConsuming();
}
void Consumer::StartWithLimit(const uint64_t limit_batches,
const std::optional<std::chrono::milliseconds> timeout) const {
if (is_running_) {
throw ConsumerRunningException(info_.consumer_name);
}
if (limit_batches < kMinimumStartBatchLimit) {
throw ConsumerStartFailedException(
info_.consumer_name, fmt::format("Batch limit has to be greater than or equal to {}", kMinimumStartBatchLimit));
}
if (timeout.value_or(kMinimumInterval) < kMinimumInterval) {
throw ConsumerStartFailedException(
info_.consumer_name,
fmt::format("Timeout has to be greater than or equal to {} milliseconds", kMinimumInterval.count()));
}
StartConsumingWithLimit(limit_batches, timeout);
}
void Consumer::Stop() {
if (!is_running_) {
throw ConsumerStoppedException(info_.consumer_name);
@ -154,7 +197,7 @@ void Consumer::StopIfRunning() {
}
}
void Consumer::Check(std::optional<std::chrono::milliseconds> timeout, std::optional<int64_t> limit_batches,
void Consumer::Check(std::optional<std::chrono::milliseconds> timeout, std::optional<uint64_t> limit_batches,
const ConsumerFunction &check_consumer_function) const {
// NOLINTNEXTLINE (modernize-use-nullptr)
if (timeout.value_or(kMinimumInterval) < kMinimumInterval) {
@ -240,9 +283,7 @@ void Consumer::StartConsuming() {
auto maybe_batch = GetBatch(consumer_, info_, is_running_, last_message_id_);
if (maybe_batch.HasError()) {
spdlog::warn("Error happened in consumer {} while fetching messages: {}!", info_.consumer_name,
maybe_batch.GetError());
break;
throw ConsumerReadMessagesFailedException(info_.consumer_name, maybe_batch.GetError());
}
const auto &batch = maybe_batch.GetValue();
@ -254,18 +295,8 @@ void Consumer::StartConsuming() {
spdlog::info("Pulsar consumer {} is processing a batch", info_.consumer_name);
try {
consumer_function_(batch);
if (std::any_of(batch.begin(), batch.end(), [&](const auto &message) {
if (const auto result = consumer_.acknowledge(message.message_); result != pulsar_client::ResultOk) {
spdlog::warn("Acknowledging a message of consumer {} failed: {}", info_.consumer_name, result);
return true;
}
last_message_id_ = message.message_.getMessageId();
return false;
})) {
break;
}
TryToConsumeBatch(consumer_, info_, consumer_function_, last_message_id_, batch,
[&](const Message &message) -> const pulsar_client::Message & { return message.message_; });
} catch (const std::exception &e) {
spdlog::warn("Error happened in consumer {} while processing a batch: {}!", info_.consumer_name, e.what());
break;
@ -277,6 +308,43 @@ void Consumer::StartConsuming() {
});
}
void Consumer::StartConsumingWithLimit(uint64_t limit_batches, std::optional<std::chrono::milliseconds> timeout) const {
if (is_running_.exchange(true)) {
throw ConsumerRunningException(info_.consumer_name);
}
utils::OnScopeExit restore_is_running([this] { is_running_.store(false); });
const auto timeout_to_use = timeout.value_or(kDefaultCheckTimeout);
const auto start = std::chrono::steady_clock::now();
for (uint64_t batch_count = 0; batch_count < limit_batches;) {
const auto now = std::chrono::steady_clock::now();
if (now - start >= timeout_to_use) {
throw ConsumerCheckFailedException(info_.consumer_name, "Timeout reached");
}
const auto maybe_batch = GetBatch(consumer_, info_, is_running_, last_message_id_);
if (maybe_batch.HasError()) {
throw ConsumerReadMessagesFailedException(info_.consumer_name, maybe_batch.GetError());
}
const auto &batch = maybe_batch.GetValue();
if (batch.empty()) {
continue;
}
++batch_count;
spdlog::info("Pulsar consumer {} is processing a batch", info_.consumer_name);
TryToConsumeBatch(consumer_, info_, consumer_function_, last_message_id_, batch,
[](const Message &message) -> const pulsar_client::Message & { return message.message_; });
spdlog::info("Pulsar consumer {} finished processing", info_.consumer_name);
}
}
void Consumer::StopConsuming() {
is_running_.store(false);
if (thread_.joinable()) {

View File

@ -58,25 +58,27 @@ class Consumer final {
bool IsRunning() const;
void Start();
void StartWithLimit(uint64_t limit_batches, std::optional<std::chrono::milliseconds> timeout) const;
void Stop();
void StopIfRunning();
void Check(std::optional<std::chrono::milliseconds> timeout, std::optional<int64_t> limit_batches,
void Check(std::optional<std::chrono::milliseconds> timeout, std::optional<uint64_t> limit_batches,
const ConsumerFunction &check_consumer_function) const;
const ConsumerInfo &Info() const;
private:
void StartConsuming();
void StartConsumingWithLimit(uint64_t limit_batches, std::optional<std::chrono::milliseconds> timeout) const;
void StopConsuming();
ConsumerInfo info_;
mutable pulsar_client::Client client_;
pulsar_client::Consumer consumer_;
mutable pulsar_client::Consumer consumer_;
ConsumerFunction consumer_function_;
mutable std::atomic<bool> is_running_{false};
pulsar_client::MessageId last_message_id_{pulsar_client::MessageId::earliest()};
mutable pulsar_client::MessageId last_message_id_{pulsar_client::MessageId::earliest()}; // Protected by is_running_
std::thread thread_;
};
} // namespace memgraph::integrations::pulsar

View File

@ -55,4 +55,16 @@ class TopicNotFoundException : public PulsarStreamException {
TopicNotFoundException(const std::string &consumer_name, const std::string &topic_name)
: PulsarStreamException("Pulsar consumer {} cannot find topic {}", consumer_name, topic_name) {}
};
class ConsumerReadMessagesFailedException : public PulsarStreamException {
public:
ConsumerReadMessagesFailedException(const std::string_view consumer_name, const std::string_view error)
: PulsarStreamException("Error happened in consumer {} while fetching messages: {}", consumer_name, error) {}
};
class ConsumerAcknowledgeMessagesFailedException : public PulsarStreamException {
public:
explicit ConsumerAcknowledgeMessagesFailedException(const std::string_view consumer_name)
: PulsarStreamException("Acknowledging a message of consumer {} has failed!", consumer_name) {}
};
} // namespace memgraph::integrations::pulsar

View File

@ -2391,6 +2391,9 @@ cpp<#
(lcp:define-enum sync-mode
(sync async)
(:serialize))
(lcp:define-enum replica-state
(ready replicating recovery invalid)
(:serialize))
#>cpp
ReplicationQuery() = default;

View File

@ -775,6 +775,23 @@ antlrcpp::Any CypherMainVisitor::visitDropStream(MemgraphCypher::DropStreamConte
antlrcpp::Any CypherMainVisitor::visitStartStream(MemgraphCypher::StartStreamContext *ctx) {
auto *stream_query = storage_->Create<StreamQuery>();
stream_query->action_ = StreamQuery::Action::START_STREAM;
if (ctx->BATCH_LIMIT()) {
if (!ctx->batchLimit->numberLiteral() || !ctx->batchLimit->numberLiteral()->integerLiteral()) {
throw SemanticException("Batch limit should be an integer literal!");
}
stream_query->batch_limit_ = ctx->batchLimit->accept(this);
}
if (ctx->TIMEOUT()) {
if (!ctx->timeout->numberLiteral() || !ctx->timeout->numberLiteral()->integerLiteral()) {
throw SemanticException("Timeout should be an integer literal!");
}
if (!ctx->BATCH_LIMIT()) {
throw SemanticException("Parameter TIMEOUT can only be defined if BATCH_LIMIT is defined");
}
stream_query->timeout_ = ctx->timeout->accept(this);
}
stream_query->stream_name_ = ctx->streamName()->symbolicName()->accept(this).as<std::string>();
return stream_query;
}

View File

@ -351,7 +351,7 @@ pulsarCreateStream : CREATE PULSAR STREAM streamName ( pulsarCreateStreamConfig
dropStream : DROP STREAM streamName ;
startStream : START STREAM streamName ;
startStream : START STREAM streamName ( BATCH_LIMIT batchLimit=literal ) ? ( TIMEOUT timeout=literal ) ? ;
startAllStreams : START ALL STREAMS ;

View File

@ -231,7 +231,20 @@ class ReplQueryHandler final : public query::ReplicationQueryHandler {
if (repl_info.timeout) {
replica.timeout = *repl_info.timeout;
}
switch (repl_info.state) {
case storage::replication::ReplicaState::READY:
replica.state = ReplicationQuery::ReplicaState::READY;
break;
case storage::replication::ReplicaState::REPLICATING:
replica.state = ReplicationQuery::ReplicaState::REPLICATING;
break;
case storage::replication::ReplicaState::RECOVERY:
replica.state = ReplicationQuery::ReplicaState::RECOVERY;
break;
case storage::replication::ReplicaState::INVALID:
replica.state = ReplicationQuery::ReplicaState::INVALID;
break;
}
return replica;
};
@ -486,6 +499,7 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &
fmt::format("Replica {} is registered.", repl_query->replica_name_));
return callback;
}
case ReplicationQuery::Action::DROP_REPLICA: {
const auto &name = repl_query->replica_name_;
callback.fn = [handler = ReplQueryHandler{interpreter_context->db}, name]() mutable {
@ -496,8 +510,9 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &
fmt::format("Replica {} is dropped.", repl_query->replica_name_));
return callback;
}
case ReplicationQuery::Action::SHOW_REPLICAS: {
callback.header = {"name", "socket_address", "sync_mode", "timeout"};
callback.header = {"name", "socket_address", "sync_mode", "timeout", "state"};
callback.fn = [handler = ReplQueryHandler{interpreter_context->db}, replica_nfields = callback.header.size()] {
const auto &replicas = handler.ShowReplicas();
auto typed_replicas = std::vector<std::vector<TypedValue>>{};
@ -508,6 +523,7 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &
typed_replica.emplace_back(TypedValue(replica.name));
typed_replica.emplace_back(TypedValue(replica.socket_address));
switch (replica.sync_mode) {
case ReplicationQuery::SyncMode::SYNC:
typed_replica.emplace_back(TypedValue("sync"));
@ -516,12 +532,28 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &
typed_replica.emplace_back(TypedValue("async"));
break;
}
if (replica.timeout) {
typed_replica.emplace_back(TypedValue(*replica.timeout));
} else {
typed_replica.emplace_back(TypedValue());
}
switch (replica.state) {
case ReplicationQuery::ReplicaState::READY:
typed_replica.emplace_back(TypedValue("ready"));
break;
case ReplicationQuery::ReplicaState::REPLICATING:
typed_replica.emplace_back(TypedValue("replicating"));
break;
case ReplicationQuery::ReplicaState::RECOVERY:
typed_replica.emplace_back(TypedValue("recovery"));
break;
case ReplicationQuery::ReplicaState::INVALID:
typed_replica.emplace_back(TypedValue("invalid"));
break;
}
typed_replicas.emplace_back(std::move(typed_replica));
}
return typed_replicas;
@ -655,12 +687,26 @@ Callback HandleStreamQuery(StreamQuery *stream_query, const Parameters &paramete
return callback;
}
case StreamQuery::Action::START_STREAM: {
callback.fn = [interpreter_context, stream_name = stream_query->stream_name_]() {
interpreter_context->streams.Start(stream_name);
return std::vector<std::vector<TypedValue>>{};
};
notifications->emplace_back(SeverityLevel::INFO, NotificationCode::START_STREAM,
fmt::format("Started stream {}.", stream_query->stream_name_));
const auto batch_limit = GetOptionalValue<int64_t>(stream_query->batch_limit_, evaluator);
const auto timeout = GetOptionalValue<std::chrono::milliseconds>(stream_query->timeout_, evaluator);
if (batch_limit.has_value()) {
if (batch_limit.value() < 0) {
throw utils::BasicException("Parameter BATCH_LIMIT cannot hold negative value");
}
callback.fn = [interpreter_context, stream_name = stream_query->stream_name_, batch_limit, timeout]() {
interpreter_context->streams.StartWithLimit(stream_name, static_cast<uint64_t>(batch_limit.value()), timeout);
return std::vector<std::vector<TypedValue>>{};
};
} else {
callback.fn = [interpreter_context, stream_name = stream_query->stream_name_]() {
interpreter_context->streams.Start(stream_name);
return std::vector<std::vector<TypedValue>>{};
};
notifications->emplace_back(SeverityLevel::INFO, NotificationCode::START_STREAM,
fmt::format("Started stream {}.", stream_query->stream_name_));
}
return callback;
}
case StreamQuery::Action::START_ALL_STREAMS: {
@ -730,9 +776,15 @@ Callback HandleStreamQuery(StreamQuery *stream_query, const Parameters &paramete
}
case StreamQuery::Action::CHECK_STREAM: {
callback.header = {"queries", "raw messages"};
const auto batch_limit = GetOptionalValue<int64_t>(stream_query->batch_limit_, evaluator);
if (batch_limit.has_value() && batch_limit.value() < 0) {
throw utils::BasicException("Parameter BATCH_LIMIT cannot hold negative value");
}
callback.fn = [interpreter_context, stream_name = stream_query->stream_name_,
timeout = GetOptionalValue<std::chrono::milliseconds>(stream_query->timeout_, evaluator),
batch_limit = GetOptionalValue<int64_t>(stream_query->batch_limit_, evaluator)]() mutable {
batch_limit]() mutable {
return interpreter_context->streams.Check(stream_name, timeout, batch_limit);
};
notifications->emplace_back(SeverityLevel::INFO, NotificationCode::CHECK_STREAM,

View File

@ -127,6 +127,7 @@ class ReplicationQueryHandler {
std::string socket_address;
ReplicationQuery::SyncMode sync_mode;
std::optional<double> timeout;
ReplicationQuery::ReplicaState state;
};
/// @throw QueryRuntimeException if an error ocurred.

View File

@ -52,10 +52,11 @@ concept Stream = requires(TStream stream) {
typename TStream::Message;
TStream{std::string{""}, typename TStream::StreamInfo{}, ConsumerFunction<typename TStream::Message>{}};
{ stream.Start() } -> std::same_as<void>;
{ stream.StartWithLimit(uint64_t{}, std::optional<std::chrono::milliseconds>{}) } -> std::same_as<void>;
{ stream.Stop() } -> std::same_as<void>;
{ stream.IsRunning() } -> std::same_as<bool>;
{
stream.Check(std::optional<std::chrono::milliseconds>{}, std::optional<int64_t>{},
stream.Check(std::optional<std::chrono::milliseconds>{}, std::optional<uint64_t>{},
ConsumerFunction<typename TStream::Message>{})
} -> std::same_as<void>;
requires std::same_as<std::decay_t<decltype(std::declval<typename TStream::StreamInfo>().common_info)>,

View File

@ -44,10 +44,13 @@ KafkaStream::StreamInfo KafkaStream::Info(std::string transformation_name) const
}
void KafkaStream::Start() { consumer_->Start(); }
void KafkaStream::StartWithLimit(uint64_t batch_limit, std::optional<std::chrono::milliseconds> timeout) const {
consumer_->StartWithLimit(batch_limit, timeout);
}
void KafkaStream::Stop() { consumer_->Stop(); }
bool KafkaStream::IsRunning() const { return consumer_->IsRunning(); }
void KafkaStream::Check(std::optional<std::chrono::milliseconds> timeout, std::optional<int64_t> batch_limit,
void KafkaStream::Check(std::optional<std::chrono::milliseconds> timeout, std::optional<uint64_t> batch_limit,
const ConsumerFunction<integrations::kafka::Message> &consumer_function) const {
consumer_->Check(timeout, batch_limit, consumer_function);
}
@ -106,10 +109,12 @@ PulsarStream::StreamInfo PulsarStream::Info(std::string transformation_name) con
}
void PulsarStream::Start() { consumer_->Start(); }
void PulsarStream::StartWithLimit(uint64_t batch_limit, std::optional<std::chrono::milliseconds> timeout) const {
consumer_->StartWithLimit(batch_limit, timeout);
}
void PulsarStream::Stop() { consumer_->Stop(); }
bool PulsarStream::IsRunning() const { return consumer_->IsRunning(); }
void PulsarStream::Check(std::optional<std::chrono::milliseconds> timeout, std::optional<int64_t> batch_limit,
void PulsarStream::Check(std::optional<std::chrono::milliseconds> timeout, std::optional<uint64_t> batch_limit,
const ConsumerFunction<Message> &consumer_function) const {
consumer_->Check(timeout, batch_limit, consumer_function);
}

View File

@ -36,10 +36,11 @@ struct KafkaStream {
StreamInfo Info(std::string transformation_name) const;
void Start();
void StartWithLimit(uint64_t batch_limit, std::optional<std::chrono::milliseconds> timeout) const;
void Stop();
bool IsRunning() const;
void Check(std::optional<std::chrono::milliseconds> timeout, std::optional<int64_t> batch_limit,
void Check(std::optional<std::chrono::milliseconds> timeout, std::optional<uint64_t> batch_limit,
const ConsumerFunction<Message> &consumer_function) const;
utils::BasicResult<std::string> SetStreamOffset(int64_t offset);
@ -71,10 +72,11 @@ struct PulsarStream {
StreamInfo Info(std::string transformation_name) const;
void Start();
void StartWithLimit(uint64_t batch_limit, std::optional<std::chrono::milliseconds> timeout) const;
void Stop();
bool IsRunning() const;
void Check(std::optional<std::chrono::milliseconds> timeout, std::optional<int64_t> batch_limit,
void Check(std::optional<std::chrono::milliseconds> timeout, std::optional<uint64_t> batch_limit,
const ConsumerFunction<Message> &consumer_function) const;
private:

View File

@ -456,7 +456,7 @@ void Streams::Create(const std::string &stream_name, typename TStream::StreamInf
try {
std::visit(
[&](auto &&stream_data) {
[&](const auto &stream_data) {
const auto stream_source_ptr = stream_data.stream_source->ReadLock();
Persist(CreateStatus(stream_name, stream_data.transformation_name, stream_data.owner, *stream_source_ptr));
},
@ -575,7 +575,7 @@ void Streams::RestoreStreams() {
auto it = CreateConsumer<T>(*locked_streams_map, stream_name, std::move(status.info), std::move(status.owner));
if (status.is_running) {
std::visit(
[&](auto &&stream_data) {
[&](const auto &stream_data) {
auto stream_source_ptr = stream_data.stream_source->Lock();
stream_source_ptr->Start();
},
@ -617,7 +617,7 @@ void Streams::Drop(const std::string &stream_name) {
// function can be executing with the consumer, nothing else.
// By acquiring the write lock here for the consumer, we make sure there is
// no running Test function for this consumer, therefore it can be erased.
std::visit([&](auto &&stream_data) { stream_data.stream_source->Lock(); }, it->second);
std::visit([&](const auto &stream_data) { stream_data.stream_source->Lock(); }, it->second);
locked_streams->erase(it);
if (!storage_.Delete(stream_name)) {
@ -632,7 +632,7 @@ void Streams::Start(const std::string &stream_name) {
auto it = GetStream(*locked_streams, stream_name);
std::visit(
[&, this](auto &&stream_data) {
[&, this](const auto &stream_data) {
auto stream_source_ptr = stream_data.stream_source->Lock();
stream_source_ptr->Start();
Persist(CreateStatus(stream_name, stream_data.transformation_name, stream_data.owner, *stream_source_ptr));
@ -640,12 +640,27 @@ void Streams::Start(const std::string &stream_name) {
it->second);
}
void Streams::StartWithLimit(const std::string &stream_name, uint64_t batch_limit,
std::optional<std::chrono::milliseconds> timeout) const {
std::optional locked_streams{streams_.ReadLock()};
auto it = GetStream(**locked_streams, stream_name);
std::visit(
[&](const auto &stream_data) {
const auto locked_stream_source = stream_data.stream_source->ReadLock();
locked_streams.reset();
locked_stream_source->StartWithLimit(batch_limit, timeout);
},
it->second);
}
void Streams::Stop(const std::string &stream_name) {
auto locked_streams = streams_.Lock();
auto it = GetStream(*locked_streams, stream_name);
std::visit(
[&, this](auto &&stream_data) {
[&, this](const auto &stream_data) {
auto stream_source_ptr = stream_data.stream_source->Lock();
stream_source_ptr->Stop();
@ -657,7 +672,7 @@ void Streams::Stop(const std::string &stream_name) {
void Streams::StartAll() {
for (auto locked_streams = streams_.Lock(); auto &[stream_name, stream_data] : *locked_streams) {
std::visit(
[&stream_name = stream_name, this](auto &&stream_data) {
[&stream_name = stream_name, this](const auto &stream_data) {
auto locked_stream_source = stream_data.stream_source->Lock();
if (!locked_stream_source->IsRunning()) {
locked_stream_source->Start();
@ -672,7 +687,7 @@ void Streams::StartAll() {
void Streams::StopAll() {
for (auto locked_streams = streams_.Lock(); auto &[stream_name, stream_data] : *locked_streams) {
std::visit(
[&stream_name = stream_name, this](auto &&stream_data) {
[&stream_name = stream_name, this](const auto &stream_data) {
auto locked_stream_source = stream_data.stream_source->Lock();
if (locked_stream_source->IsRunning()) {
locked_stream_source->Stop();
@ -689,7 +704,7 @@ std::vector<StreamStatus<>> Streams::GetStreamInfo() const {
{
for (auto locked_streams = streams_.ReadLock(); const auto &[stream_name, stream_data] : *locked_streams) {
std::visit(
[&, &stream_name = stream_name](auto &&stream_data) {
[&, &stream_name = stream_name](const auto &stream_data) {
auto locked_stream_source = stream_data.stream_source->ReadLock();
auto info = locked_stream_source->Info(stream_data.transformation_name);
result.emplace_back(StreamStatus<>{stream_name, StreamType(*locked_stream_source),
@ -703,12 +718,12 @@ std::vector<StreamStatus<>> Streams::GetStreamInfo() const {
}
TransformationResult Streams::Check(const std::string &stream_name, std::optional<std::chrono::milliseconds> timeout,
std::optional<int64_t> batch_limit) const {
std::optional<uint64_t> batch_limit) const {
std::optional locked_streams{streams_.ReadLock()};
auto it = GetStream(**locked_streams, stream_name);
return std::visit(
[&](auto &&stream_data) {
[&](const auto &stream_data) {
// This depends on the fact that Drop will first acquire a write lock to the consumer, and erase it only after
// that
const auto locked_stream_source = stream_data.stream_source->ReadLock();

View File

@ -115,6 +115,17 @@ class Streams final {
/// @throws ConsumerRunningException if the consumer is already running
void Start(const std::string &stream_name);
/// Start consuming from a stream.
///
/// @param stream_name name of the stream that needs to be started
/// @param batch_limit number of batches we want to consume before stopping
/// @param timeout the maximum duration during which the command should run.
///
/// @throws StreamsException if the stream doesn't exist
/// @throws ConsumerRunningException if the consumer is already running
void StartWithLimit(const std::string &stream_name, uint64_t batch_limit,
std::optional<std::chrono::milliseconds> timeout) const;
/// Stop consuming from a stream.
///
/// @param stream_name name of the stream that needs to be stopped
@ -142,6 +153,7 @@ class Streams final {
///
/// @param stream_name name of the stream we want to test
/// @param batch_limit number of batches we want to test before stopping
/// @param timeout the maximum duration during which the command should run.
///
/// @returns A vector of vectors of TypedValue. Each subvector contains two elements, the query string and the
/// nullable parameters map.
@ -151,7 +163,7 @@ class Streams final {
/// @throws ConsumerCheckFailedException if the transformation function throws any std::exception during processing
TransformationResult Check(const std::string &stream_name,
std::optional<std::chrono::milliseconds> timeout = std::nullopt,
std::optional<int64_t> batch_limit = std::nullopt) const;
std::optional<uint64_t> batch_limit = std::nullopt) const;
private:
template <Stream TStream>

View File

@ -306,6 +306,13 @@ Storage::Storage(Config config)
uuid_(utils::GenerateUUID()),
epoch_id_(utils::GenerateUUID()),
global_locker_(file_retainer_.AddLocker()) {
if (config_.durability.snapshot_wal_mode == Config::Durability::SnapshotWalMode::DISABLED &&
replication_role_ == ReplicationRole::MAIN) {
spdlog::warn(
"The instance has the MAIN replication role, but durability logs and snapshots are disabled. Please consider "
"enabling durability by using --storage-snapshot-interval-sec and --storage-wal-enabled flags because "
"without write-ahead logs this instance is not replicating any data.");
}
if (config_.durability.snapshot_wal_mode != Config::Durability::SnapshotWalMode::DISABLED ||
config_.durability.snapshot_on_exit || config_.durability.recover_on_startup) {
// Create the directory initially to crash the database in case of
@ -1879,13 +1886,22 @@ utils::BasicResult<Storage::RegisterReplicaError> Storage::RegisterReplica(
MG_ASSERT(replication_role_.load() == ReplicationRole::MAIN, "Only main instance can register a replica!");
const bool name_exists = replication_clients_.WithLock([&](auto &clients) {
return std::any_of(clients.begin(), clients.end(), [&](auto &client) { return client->Name() == name; });
return std::any_of(clients.begin(), clients.end(), [&name](const auto &client) { return client->Name() == name; });
});
if (name_exists) {
return RegisterReplicaError::NAME_EXISTS;
}
const auto end_point_exists = replication_clients_.WithLock([&endpoint](auto &clients) {
return std::any_of(clients.begin(), clients.end(),
[&endpoint](const auto &client) { return client->Endpoint() == endpoint; });
});
if (end_point_exists) {
return RegisterReplicaError::END_POINT_EXISTS;
}
MG_ASSERT(replication_mode == replication::ReplicationMode::SYNC || !config.timeout,
"Only SYNC mode can have a timeout set");
@ -1898,10 +1914,15 @@ utils::BasicResult<Storage::RegisterReplicaError> Storage::RegisterReplica(
// Another thread could have added a client with same name while
// we were connecting to this client.
if (std::any_of(clients.begin(), clients.end(),
[&](auto &other_client) { return client->Name() == other_client->Name(); })) {
[&](const auto &other_client) { return client->Name() == other_client->Name(); })) {
return RegisterReplicaError::NAME_EXISTS;
}
if (std::any_of(clients.begin(), clients.end(),
[&client](const auto &other_client) { return client->Endpoint() == other_client->Endpoint(); })) {
return RegisterReplicaError::END_POINT_EXISTS;
}
clients.push_back(std::move(client));
return {};
});

View File

@ -411,7 +411,7 @@ class Storage final {
bool SetMainReplicationRole();
enum class RegisterReplicaError : uint8_t { NAME_EXISTS, CONNECTION_FAILED };
enum class RegisterReplicaError : uint8_t { NAME_EXISTS, END_POINT_EXISTS, CONNECTION_FAILED };
/// @pre The instance should have a MAIN role
/// @pre Timeout can only be set for SYNC replication

View File

@ -1,25 +1,32 @@
# Set up C++ functions for e2e tests
function(add_query_module target_name src)
add_library(${target_name} SHARED ${src})
SET_TARGET_PROPERTIES(${target_name} PROPERTIES PREFIX "")
target_include_directories(${target_name} PRIVATE ${CMAKE_SOURCE_DIR}/include)
add_library(${target_name} SHARED ${src})
SET_TARGET_PROPERTIES(${target_name} PROPERTIES PREFIX "")
target_include_directories(${target_name} PRIVATE ${CMAKE_SOURCE_DIR}/include)
endfunction()
function(copy_e2e_python_files TARGET_PREFIX FILE_NAME)
add_custom_target(memgraph__e2e__${TARGET_PREFIX}__${FILE_NAME} ALL
COMMAND ${CMAKE_COMMAND} -E copy
add_custom_target(memgraph__e2e__${TARGET_PREFIX}__${FILE_NAME} ALL
COMMAND ${CMAKE_COMMAND} -E copy
${CMAKE_CURRENT_SOURCE_DIR}/${FILE_NAME}
${CMAKE_CURRENT_BINARY_DIR}/${FILE_NAME}
DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/${FILE_NAME})
DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/${FILE_NAME})
endfunction()
function(copy_e2e_python_files_from_parent_folder TARGET_PREFIX EXTRA_PATH FILE_NAME)
add_custom_target(memgraph__e2e__${TARGET_PREFIX}__${FILE_NAME} ALL
COMMAND ${CMAKE_COMMAND} -E copy
${CMAKE_CURRENT_SOURCE_DIR}/${EXTRA_PATH}/${FILE_NAME}
${CMAKE_CURRENT_BINARY_DIR}/${FILE_NAME}
DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/${EXTRA_PATH}/${FILE_NAME})
endfunction()
function(copy_e2e_cpp_files TARGET_PREFIX FILE_NAME)
add_custom_target(memgraph__e2e__${TARGET_PREFIX}__${FILE_NAME} ALL
COMMAND ${CMAKE_COMMAND} -E copy
add_custom_target(memgraph__e2e__${TARGET_PREFIX}__${FILE_NAME} ALL
COMMAND ${CMAKE_COMMAND} -E copy
${CMAKE_CURRENT_SOURCE_DIR}/${FILE_NAME}
${CMAKE_CURRENT_BINARY_DIR}/${FILE_NAME}
DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/${FILE_NAME})
DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/${FILE_NAME})
endfunction()
add_subdirectory(server)

View File

@ -0,0 +1,219 @@
# Copyright 2022 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
# TODO(gitbuda): Add action to print the context/cluster.
# TODO(gitbuda): Add action to print logs of each Memgraph instance.
# TODO(gitbuda): Polish naming within script.
# TODO(gitbuda): Consider moving this somewhere higher in the project or even put inside GQLAlchmey.
# The idea here is to implement simple interactive runner of Memgraph instances because:
# * it should be possible to manually create new test cases first
# by just running this script and executing command manually from e.g. mgconsole,
# running single instance of Memgraph is easy but running multiple instances and
# controlling them is not that easy
# * it should be easy to create new operational test without huge knowledge overhead
# by e.g. calling `process_actions` from any e2e Python test, the test will contain the
# string with all actions and should run test code in a different thread.
#
# NOTE: The intention here is not to provide infrastructure to write data
# correctness tests or any heavy workload, the intention is to being able to
# easily test e2e "operational" cases, simple cluster setup and basic Memgraph
# operational queries. For any type of data correctness tests Jepsen or similar
# approaches have to be employed.
# NOTE: The instance description / context should be compatible with tests/e2e/runner.py
import atexit
import logging
import os
import subprocess
from argparse import ArgumentParser
from pathlib import Path
import time
import sys
from inspect import signature
import yaml
from memgraph import MemgraphInstanceRunner
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", ".."))
BUILD_DIR = os.path.join(PROJECT_DIR, "build")
MEMGRAPH_BINARY = os.path.join(BUILD_DIR, "memgraph")
# Cluster description, injectable as the context.
# If the script argument is not provided, the following will be used as a default.
MEMGRAPH_INSTANCES_DESCRIPTION = {
"replica1": {
"args": ["--bolt-port", "7688", "--log-level=TRACE"],
"log_file": "replica1.log",
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"],
},
"replica2": {
"args": ["--bolt-port", "7689", "--log-level=TRACE"],
"log_file": "replica2.log",
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"],
},
"main": {
"args": ["--bolt-port", "7687", "--log-level=TRACE"],
"log_file": "main.log",
"setup_queries": [
"REGISTER REPLICA replica1 SYNC TO '127.0.0.1:10001'",
"REGISTER REPLICA replica2 SYNC WITH TIMEOUT 1 TO '127.0.0.1:10002'",
],
},
}
MEMGRAPH_INSTANCES = {}
ACTIONS = {
"info": lambda context: info(context),
"stop": lambda context, name: stop(context, name),
"start": lambda context, name: start(context, name),
"sleep": lambda context, delta: time.sleep(float(delta)),
"exit": lambda context: sys.exit(1),
"quit": lambda context: sys.exit(1),
}
log = logging.getLogger("memgraph.tests.e2e")
def load_args():
parser = ArgumentParser()
parser.add_argument("--actions", required=False, help="What actions to run", default="")
parser.add_argument(
"--context-yaml",
required=False,
help="YAML file with the cluster description",
default="",
)
return parser.parse_args()
def _start_instance(name, args, log_file, queries, use_ssl, procdir):
mg_instance = MemgraphInstanceRunner(MEMGRAPH_BINARY, use_ssl)
MEMGRAPH_INSTANCES[name] = mg_instance
log_file_path = os.path.join(BUILD_DIR, "logs", log_file)
binary_args = args + ["--log-file", log_file_path]
if len(procdir) != 0:
binary_args.append("--query-modules-directory=" + procdir)
mg_instance.start(args=binary_args)
for query in queries:
mg_instance.query(query)
return mg_instance
def stop_all():
for mg_instance in MEMGRAPH_INSTANCES.values():
mg_instance.stop()
def stop_instance(context, name):
for key, _ in context.items():
if key != name:
continue
MEMGRAPH_INSTANCES[name].stop()
def stop(context, name):
if name != "all":
stop_instance(context, name)
return
stop_all()
@atexit.register
def cleanup():
stop_all()
def start_instance(context, name, procdir):
mg_instances = {}
for key, value in context.items():
if key != name:
continue
args = value["args"]
log_file = value["log_file"]
queries = []
if "setup_queries" in value:
queries = value["setup_queries"]
use_ssl = False
if "ssl" in value:
use_ssl = bool(value["ssl"])
value.pop("ssl")
instance = _start_instance(name, args, log_file, queries, use_ssl, procdir)
mg_instances[name] = instance
assert len(mg_instances) == 1
return mg_instances
def start_all(context, procdir=""):
mg_instances = {}
for key, _ in context.items():
mg_instances.update(start_instance(context, key, procdir))
return mg_instances
def start(context, name, procdir=""):
if name != "all":
return start_instance(context, name, procdir)
return start_all(context)
def info(context):
print("{:<15s}{:>6s}".format("NAME", "STATUS"))
for name, _ in context.items():
if name not in MEMGRAPH_INSTANCES:
continue
instance = MEMGRAPH_INSTANCES[name]
print("{:<15s}{:>6s}".format(name, "UP" if instance.is_running() else "DOWN"))
def process_actions(context, actions):
actions = actions.split(" ")
actions.reverse()
while len(actions) > 0:
name = actions.pop()
action = ACTIONS[name]
args_no = len(signature(action).parameters) - 1
assert (
args_no >= 0
), "Wrong action definition, each action has to accept at least 1 argument which is the context."
assert args_no <= 1, "Actions with more than one user argument are not yet supported"
if args_no == 0:
action(context)
if args_no == 1:
action(context, actions.pop())
if __name__ == "__main__":
args = load_args()
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(asctime)s %(name)s] %(message)s")
if args.context_yaml == "":
context = MEMGRAPH_INSTANCES_DESCRIPTION
else:
with open(args.context_yaml, "r") as f:
context = yaml.load(f, Loader=yaml.FullLoader)
if args.actions != "":
process_actions(context, args.actions)
sys.exit(0)
while True:
choice = input("ACTION>")
process_actions(context, choice)

View File

@ -9,3 +9,6 @@ target_link_libraries(memgraph__e2e__replication__read_write_benchmark gflags js
copy_e2e_python_files(replication_show common.py)
copy_e2e_python_files(replication_show conftest.py)
copy_e2e_python_files(replication_show show.py)
copy_e2e_python_files(replication_show show_while_creating_invalid_state.py)
copy_e2e_python_files_from_parent_folder(replication_show ".." memgraph.py)
copy_e2e_python_files_from_parent_folder(replication_show ".." interactive_mg_runner.py)

View File

@ -12,6 +12,7 @@
import sys
import pytest
from common import execute_and_fetch_all
@ -26,21 +27,5 @@ def test_show_replication_role(port, role, connection):
assert data[0][0] == role
def test_show_replicas(connection):
cursor = connection(7687, "main").cursor()
actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
expected_column_names = {"name", "socket_address", "sync_mode", "timeout"}
actual_column_names = {x.name for x in cursor.description}
assert expected_column_names == actual_column_names
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 0),
("replica_2", "127.0.0.1:10002", "sync", 1.0),
("replica_3", "127.0.0.1:10003", "async", None),
}
assert expected_data == actual_data
if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"]))

View File

@ -0,0 +1,121 @@
# Copyright 2022 Memgraph Ltd.
#
# Use of this software is governed by the Business Source License
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
# License, and you may not use this file except in compliance with the Business Source License.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0, included in the file
# licenses/APL.txt.
import sys
import atexit
import os
import pytest
import time
from common import execute_and_fetch_all
import interactive_mg_runner
interactive_mg_runner.SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
interactive_mg_runner.PROJECT_DIR = os.path.normpath(
os.path.join(interactive_mg_runner.SCRIPT_DIR, "..", "..", "..", "..")
)
interactive_mg_runner.BUILD_DIR = os.path.normpath(os.path.join(interactive_mg_runner.PROJECT_DIR, "build"))
interactive_mg_runner.MEMGRAPH_BINARY = os.path.normpath(os.path.join(interactive_mg_runner.BUILD_DIR, "memgraph"))
MEMGRAPH_INSTANCES_DESCRIPTION = {
"replica_1": {
"args": ["--bolt-port", "7688", "--log-level=TRACE"],
"log_file": "replica1.log",
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"],
},
"replica_2": {
"args": ["--bolt-port", "7689", "--log-level=TRACE"],
"log_file": "replica2.log",
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"],
},
"replica_3": {
"args": ["--bolt-port", "7690", "--log-level=TRACE"],
"log_file": "replica3.log",
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10003;"],
},
"replica_4": {
"args": ["--bolt-port", "7691", "--log-level=TRACE"],
"log_file": "replica4.log",
"setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10004;"],
},
"main": {
"args": ["--bolt-port", "7687", "--log-level=TRACE"],
"log_file": "main.log",
"setup_queries": [
"REGISTER REPLICA replica_1 SYNC WITH TIMEOUT 0 TO '127.0.0.1:10001';",
"REGISTER REPLICA replica_2 SYNC WITH TIMEOUT 1 TO '127.0.0.1:10002';",
"REGISTER REPLICA replica_3 ASYNC TO '127.0.0.1:10003';",
"REGISTER REPLICA replica_4 ASYNC TO '127.0.0.1:10004';",
],
},
}
def test_show_replicas(connection):
# Goal of this test is to check the SHOW REPLICAS command.
# 0/ We start all replicas manually: we want to be able to kill them ourselves without relying on external tooling to kill processes.
# 1/ We check that all replicas have the correct state: they should all be ready.
# 2/ We drop one replica. It should not appear anymore in the SHOW REPLICAS command.
# 3/ We kill another replica. It should become invalid in the SHOW REPLICAS command.
# 0/
atexit.register(
interactive_mg_runner.stop_all
) # Needed in case the test fails due to an assert. One still want the instances to be stoped.
mg_instances = interactive_mg_runner.start_all(MEMGRAPH_INSTANCES_DESCRIPTION)
cursor = connection(7687, "main").cursor()
# 1/
actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
EXPECTED_COLUMN_NAMES = {"name", "socket_address", "sync_mode", "timeout", "state"}
actual_column_names = {x.name for x in cursor.description}
assert EXPECTED_COLUMN_NAMES == actual_column_names
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 0, "ready"),
("replica_2", "127.0.0.1:10002", "sync", 1.0, "ready"),
("replica_3", "127.0.0.1:10003", "async", None, "ready"),
("replica_4", "127.0.0.1:10004", "async", None, "ready"),
}
assert expected_data == actual_data
# 2/
execute_and_fetch_all(cursor, "DROP REPLICA replica_2")
actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 0, "ready"),
("replica_3", "127.0.0.1:10003", "async", None, "ready"),
("replica_4", "127.0.0.1:10004", "async", None, "ready"),
}
assert expected_data == actual_data
# 3/
mg_instances["replica_1"].kill()
mg_instances["replica_3"].kill()
mg_instances["replica_4"].stop()
# We leave some time for the main to realise the replicas are down.
time.sleep(2)
actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;"))
expected_data = {
("replica_1", "127.0.0.1:10001", "sync", 0, "invalid"),
("replica_3", "127.0.0.1:10003", "async", None, "invalid"),
("replica_4", "127.0.0.1:10004", "async", None, "invalid"),
}
assert expected_data == actual_data
if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"]))

View File

@ -74,3 +74,7 @@ workloads:
"REGISTER REPLICA replica_3 ASYNC TO '127.0.0.1:10003'"
]
validation_queries: []
- name: "Show while creating invalid state"
binary: "tests/e2e/pytest_runner.sh"
args: ["replication/show_while_creating_invalid_state.py"]

View File

@ -18,12 +18,11 @@ from pathlib import Path
import yaml
from memgraph import MemgraphInstanceRunner
import interactive_mg_runner
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", ".."))
BUILD_DIR = os.path.join(PROJECT_DIR, "build")
MEMGRAPH_BINARY = os.path.join(BUILD_DIR, "memgraph")
log = logging.getLogger("memgraph.tests.e2e")
@ -58,30 +57,22 @@ def run(args):
for mg_instance in mg_instances.values():
mg_instance.stop()
for name, config in workload["cluster"].items():
use_ssl = False
if "ssl" in config:
use_ssl = bool(config["ssl"])
config.pop("ssl")
mg_instance = MemgraphInstanceRunner(MEMGRAPH_BINARY, use_ssl)
mg_instances[name] = mg_instance
log_file_path = os.path.join(BUILD_DIR, "logs", config["log_file"])
binary_args = config["args"] + ["--log-file", log_file_path]
if "cluster" in workload:
procdir = ""
if "proc" in workload:
procdir = "--query-modules-directory=" + os.path.join(BUILD_DIR, workload["proc"])
binary_args.append(procdir)
mg_instance.start(args=binary_args)
for query in config.get("setup_queries", []):
mg_instance.query(query)
procdir = os.path.join(BUILD_DIR, workload["proc"])
mg_instances = interactive_mg_runner.start_all(workload["cluster"], procdir)
# Test.
mg_test_binary = os.path.join(BUILD_DIR, workload["binary"])
subprocess.run([mg_test_binary] + workload["args"], check=True, stderr=subprocess.STDOUT)
# Validation.
for name, config in workload["cluster"].items():
for validation in config.get("validation_queries", []):
mg_instance = mg_instances[name]
data = mg_instance.query(validation["query"])[0][0]
assert data == validation["expected"]
if "cluster" in workload:
for name, config in workload["cluster"].items():
for validation in config.get("validation_queries", []):
mg_instance = mg_instances[name]
data = mg_instance.query(validation["query"])[0][0]
assert data == validation["expected"]
cleanup()
log.info("%s PASSED.", workload_name)

View File

@ -10,6 +10,7 @@
# licenses/APL.txt.
import mgclient
import pytest
import time
from multiprocessing import Manager, Process, Value
@ -112,6 +113,13 @@ def start_stream(cursor, stream_name):
assert get_is_running(cursor, stream_name)
def start_stream_with_limit(cursor, stream_name, batch_limit, timeout=None):
if timeout is not None:
execute_and_fetch_all(cursor, f"START STREAM {stream_name} BATCH_LIMIT {batch_limit} TIMEOUT {timeout} ")
else:
execute_and_fetch_all(cursor, f"START STREAM {stream_name} BATCH_LIMIT {batch_limit}")
def stop_stream(cursor, stream_name):
execute_and_fetch_all(cursor, f"STOP STREAM {stream_name}")
@ -253,10 +261,11 @@ def test_start_checked_stream_after_timeout(connection, stream_creator):
cursor = connection.cursor()
execute_and_fetch_all(cursor, stream_creator("test_stream"))
TIMEOUT_MS = 2000
TIMEOUT_IN_MS = 2000
TIMEOUT_IN_SECONDS = TIMEOUT_IN_MS / 1000
def call_check():
execute_and_fetch_all(connect().cursor(), f"CHECK STREAM test_stream TIMEOUT {TIMEOUT_MS}")
execute_and_fetch_all(connect().cursor(), f"CHECK STREAM test_stream TIMEOUT {TIMEOUT_IN_MS}")
check_stream_proc = Process(target=call_check, daemon=True)
@ -266,7 +275,7 @@ def test_start_checked_stream_after_timeout(connection, stream_creator):
start_stream(cursor, "test_stream")
end = time.time()
assert (end - start) < 1.3 * TIMEOUT_MS, "The START STREAM was blocked too long"
assert (end - start) < 1.3 * TIMEOUT_IN_SECONDS, "The START STREAM was blocked too long"
assert get_is_running(cursor, "test_stream")
stop_stream(cursor, "test_stream")
@ -401,3 +410,239 @@ def test_check_stream_different_number_of_queries_than_messages(connection, stre
assert expected_queries_and_raw_messages_1 == results.value[0]
assert expected_queries_and_raw_messages_2 == results.value[1]
assert expected_queries_and_raw_messages_3 == results.value[2]
def test_start_stream_with_batch_limit(connection, stream_creator, messages_sender):
STREAM_NAME = "test"
BATCH_LIMIT = 5
cursor = connection.cursor()
execute_and_fetch_all(cursor, stream_creator(STREAM_NAME))
def start_new_stream_with_limit(stream_name, batch_limit):
connection = connect()
cursor = connection.cursor()
start_stream_with_limit(cursor, stream_name, batch_limit)
thread_stream_running = Process(target=start_new_stream_with_limit, daemon=True, args=(STREAM_NAME, BATCH_LIMIT))
thread_stream_running.start()
time.sleep(2)
assert get_is_running(cursor, STREAM_NAME)
messages_sender(BATCH_LIMIT - 1)
# We have not sent enough batches to reach the limit. We check that the stream is still correctly running.
assert get_is_running(cursor, STREAM_NAME)
# We send a last message to reach the batch_limit
messages_sender(1)
time.sleep(2)
# We check that the stream has correctly stoped.
assert not get_is_running(cursor, STREAM_NAME)
def test_start_stream_with_batch_limit_timeout(connection, stream_creator):
# We check that we get the expected exception when trying to run START STREAM while providing TIMEOUT and not BATCH_LIMIT
STREAM_NAME = "test"
cursor = connection.cursor()
execute_and_fetch_all(cursor, stream_creator(STREAM_NAME))
with pytest.raises(mgclient.DatabaseError):
execute_and_fetch_all(cursor, f"START STREAM {STREAM_NAME} TIMEOUT 3000")
def test_start_stream_with_batch_limit_reaching_timeout(connection, stream_creator):
# We check that we get the expected exception when running START STREAM while providing TIMEOUT and BATCH_LIMIT
STREAM_NAME = "test"
BATCH_LIMIT = 5
TIMEOUT = 3000
TIMEOUT_IN_SECONDS = TIMEOUT / 1000
cursor = connection.cursor()
execute_and_fetch_all(cursor, stream_creator(STREAM_NAME, BATCH_SIZE))
start_time = time.time()
with pytest.raises(mgclient.DatabaseError):
execute_and_fetch_all(cursor, f"START STREAM {STREAM_NAME} BATCH_LIMIT {BATCH_LIMIT} TIMEOUT {TIMEOUT}")
end_time = time.time()
assert (
end_time - start_time
) >= TIMEOUT_IN_SECONDS, "The START STREAM has probably thrown due to something else than timeout!"
def test_start_stream_with_batch_limit_while_check_running(
connection, stream_creator, message_sender, setup_function=None
):
# 1/ We check we get the correct exception calling START STREAM with BATCH_LIMIT while a CHECK STREAM is already running.
# 2/ Afterwards, we terminate the CHECK STREAM and start a START STREAM with BATCH_LIMIT
def start_check_stream(stream_name, batch_limit, timeout):
connection = connect()
cursor = connection.cursor()
execute_and_fetch_all(cursor, f"CHECK STREAM {stream_name} BATCH_LIMIT {batch_limit} TIMEOUT {timeout}")
def start_new_stream_with_limit(stream_name, batch_limit, timeout):
connection = connect()
cursor = connection.cursor()
start_stream_with_limit(cursor, stream_name, batch_limit, timeout=timeout)
STREAM_NAME = "test_check_and_batch_limit"
BATCH_LIMIT = 1
TIMEOUT = 10000
cursor = connection.cursor()
execute_and_fetch_all(cursor, stream_creator(STREAM_NAME))
# 0/ Extra setup needed for Kafka to works correctly if Check stream is execute before any messages have been consumed.
if setup_function is not None:
setup_function(start_check_stream, cursor, STREAM_NAME, BATCH_LIMIT, TIMEOUT)
# 1/
thread_stream_check = Process(target=start_check_stream, daemon=True, args=(STREAM_NAME, BATCH_LIMIT, TIMEOUT))
thread_stream_check.start()
time.sleep(2)
assert get_is_running(cursor, STREAM_NAME)
with pytest.raises(mgclient.DatabaseError):
start_stream_with_limit(cursor, STREAM_NAME, BATCH_LIMIT, timeout=TIMEOUT)
assert get_is_running(cursor, STREAM_NAME)
message_sender(SIMPLE_MSG)
thread_stream_check.join()
assert not get_is_running(cursor, STREAM_NAME)
# 2/
thread_stream_running = Process(
target=start_new_stream_with_limit, daemon=True, args=(STREAM_NAME, BATCH_LIMIT + 1, TIMEOUT)
) # Sending BATCH_LIMIT + 1 messages as BATCH_LIMIT messages have already been sent during the CHECK STREAM (and not consumed)
thread_stream_running.start()
time.sleep(2)
assert get_is_running(cursor, STREAM_NAME)
message_sender(SIMPLE_MSG)
time.sleep(2)
assert not get_is_running(cursor, STREAM_NAME)
def test_check_while_stream_with_batch_limit_running(connection, stream_creator, message_sender):
# 1/ We check we get the correct exception calling CHECK STREAM while START STREAM with BATCH_LIMIT is already running
# 2/ Afterwards, we terminate the START STREAM with BATCH_LIMIT and start a CHECK STREAM
def start_new_stream_with_limit(stream_name, batch_limit, timeout):
connection = connect()
cursor = connection.cursor()
start_stream_with_limit(cursor, stream_name, batch_limit, timeout=timeout)
def start_check_stream(stream_name, batch_limit, timeout):
connection = connect()
cursor = connection.cursor()
execute_and_fetch_all(cursor, f"CHECK STREAM {stream_name} BATCH_LIMIT {batch_limit} TIMEOUT {timeout}")
STREAM_NAME = "test_batch_limit_and_check"
BATCH_LIMIT = 1
TIMEOUT = 10000
TIMEOUT_IN_SECONDS = TIMEOUT / 1000
cursor = connection.cursor()
execute_and_fetch_all(cursor, stream_creator(STREAM_NAME))
# 1/
thread_stream_running = Process(
target=start_new_stream_with_limit, daemon=True, args=(STREAM_NAME, BATCH_LIMIT, TIMEOUT)
)
start_time = time.time()
thread_stream_running.start()
time.sleep(2)
assert get_is_running(cursor, STREAM_NAME)
with pytest.raises(mgclient.DatabaseError):
execute_and_fetch_all(cursor, f"CHECK STREAM {STREAM_NAME} BATCH_LIMIT {BATCH_LIMIT} TIMEOUT {TIMEOUT}")
end_time = time.time()
assert (end_time - start_time) < 0.8 * TIMEOUT, "The CHECK STREAM has probably thrown due to timeout!"
message_sender(SIMPLE_MSG)
time.sleep(2)
assert not get_is_running(cursor, STREAM_NAME)
# 2/
thread_stream_check = Process(target=start_check_stream, daemon=True, args=(STREAM_NAME, BATCH_LIMIT, TIMEOUT))
start_time = time.time()
thread_stream_check.start()
time.sleep(2)
assert get_is_running(cursor, STREAM_NAME)
message_sender(SIMPLE_MSG)
time.sleep(2)
end_time = time.time()
assert (end_time - start_time) < 0.8 * TIMEOUT_IN_SECONDS, "The CHECK STREAM has probably thrown due to timeout!"
assert not get_is_running(cursor, STREAM_NAME)
def test_start_stream_with_batch_limit_with_invalid_batch_limit(connection, stream_creator):
# We check that we get a correct exception when giving a negative batch_limit
STREAM_NAME = "test_batch_limit_invalid_batch_limit"
TIMEOUT = 10000
TIMEOUT_IN_SECONDS = TIMEOUT / 1000
cursor = connection.cursor()
execute_and_fetch_all(cursor, stream_creator(STREAM_NAME))
time.sleep(2)
# 1/ checking with batch_limit=-10
batch_limit = -10
start_time = time.time()
with pytest.raises(mgclient.DatabaseError):
start_stream_with_limit(cursor, STREAM_NAME, batch_limit, timeout=TIMEOUT)
end_time = time.time()
assert (end_time - start_time) < 0.8 * TIMEOUT_IN_SECONDS, "The START STREAM has probably thrown due to timeout!"
# 2/ checking with batch_limit=0
batch_limit = 0
start_time = time.time()
with pytest.raises(mgclient.DatabaseError):
start_stream_with_limit(cursor, STREAM_NAME, batch_limit, timeout=TIMEOUT)
end_time = time.time()
assert (end_time - start_time) < 0.8 * TIMEOUT_IN_SECONDS, "The START STREAM has probably thrown due to timeout!"
def test_check_stream_with_batch_limit_with_invalid_batch_limit(connection, stream_creator):
# We check that we get a correct exception when giving a negative batch_limit
STREAM_NAME = "test_batch_limit_invalid_batch_limit"
TIMEOUT = 10000
TIMEOUT_IN_SECONDS = TIMEOUT / 1000
cursor = connection.cursor()
execute_and_fetch_all(cursor, stream_creator(STREAM_NAME))
time.sleep(2)
# 1/ checking with batch_limit=-10
batch_limit = -10
start_time = time.time()
with pytest.raises(mgclient.DatabaseError):
execute_and_fetch_all(cursor, f"CHECK STREAM {STREAM_NAME} BATCH_LIMIT {batch_limit} TIMEOUT {TIMEOUT}")
end_time = time.time()
assert (end_time - start_time) < 0.8 * TIMEOUT_IN_SECONDS, "The CHECK STREAM has probably thrown due to timeout!"
# 2/ checking with batch_limit=0
batch_limit = 0
start_time = time.time()
with pytest.raises(mgclient.DatabaseError):
execute_and_fetch_all(cursor, f"CHECK STREAM {STREAM_NAME} BATCH_LIMIT {batch_limit} TIMEOUT {TIMEOUT}")
end_time = time.time()
assert (end_time - start_time) < 0.8 * TIMEOUT_IN_SECONDS, "The CHECK STREAM has probably thrown due to timeout!"

View File

@ -18,7 +18,7 @@ import time
from multiprocessing import Process, Value
import common
TRANSFORMATIONS_TO_CHECK_C = ["empty_transformation"]
TRANSFORMATIONS_TO_CHECK_C = ["c_transformations.empty_transformation"]
TRANSFORMATIONS_TO_CHECK_PY = ["kafka_transform.simple", "kafka_transform.with_parameters"]
@ -381,10 +381,11 @@ def test_info_procedure(kafka_topics, connection):
@pytest.mark.parametrize("transformation", TRANSFORMATIONS_TO_CHECK_C)
def test_load_c_transformations(connection, transformation):
cursor = connection.cursor()
query = f"CALL mg.transformations() YIELD * WITH name WHERE name STARTS WITH 'c_transformations.{transformation}' RETURN name"
query = f"CALL mg.transformations() YIELD * WITH name WHERE name STARTS WITH '{transformation}' RETURN name"
result = common.execute_and_fetch_all(cursor, query)
assert len(result) == 1
assert result[0][0] == f"c_transformations.{transformation}"
assert result[0][0] == transformation
def test_check_stream_same_number_of_queries_than_messages(kafka_producer, kafka_topics, connection):
@ -415,5 +416,100 @@ def test_check_stream_different_number_of_queries_than_messages(kafka_producer,
common.test_check_stream_different_number_of_queries_than_messages(connection, stream_creator, message_sender)
def test_start_stream_with_batch_limit(kafka_producer, kafka_topics, connection):
assert len(kafka_topics) > 0
def stream_creator(stream_name):
return (
f"CREATE KAFKA STREAM {stream_name} TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.simple BATCH_SIZE 1"
)
def messages_sender(nof_messages):
for x in range(nof_messages):
kafka_producer.send(kafka_topics[0], common.SIMPLE_MSG).get(timeout=60)
common.test_start_stream_with_batch_limit(connection, stream_creator, messages_sender)
def test_start_stream_with_batch_limit_timeout(kafka_producer, kafka_topics, connection):
assert len(kafka_topics) > 0
def stream_creator(stream_name):
return (
f"CREATE KAFKA STREAM {stream_name} TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.simple BATCH_SIZE 1"
)
common.test_start_stream_with_batch_limit_timeout(connection, stream_creator)
def test_start_stream_with_batch_limit_reaching_timeout(kafka_producer, kafka_topics, connection):
assert len(kafka_topics) > 0
def stream_creator(stream_name, batch_size):
return f"CREATE KAFKA STREAM {stream_name} TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.simple BATCH_SIZE {batch_size}"
common.test_start_stream_with_batch_limit_reaching_timeout(connection, stream_creator)
def test_start_stream_with_batch_limit_while_check_running(kafka_producer, kafka_topics, connection):
assert len(kafka_topics) > 0
def stream_creator(stream_name):
return (
f"CREATE KAFKA STREAM {stream_name} TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.simple BATCH_SIZE 1"
)
def message_sender(message):
kafka_producer.send(kafka_topics[0], message).get(timeout=6000)
def setup_function(start_check_stream, cursor, stream_name, batch_limit, timeout):
thread_stream_check = Process(target=start_check_stream, daemon=True, args=(stream_name, batch_limit, timeout))
thread_stream_check.start()
time.sleep(2)
assert common.get_is_running(cursor, stream_name)
message_sender(common.SIMPLE_MSG)
thread_stream_check.join()
common.test_start_stream_with_batch_limit_while_check_running(
connection, stream_creator, message_sender, setup_function
)
def test_check_while_stream_with_batch_limit_running(kafka_producer, kafka_topics, connection):
assert len(kafka_topics) > 0
def stream_creator(stream_name):
return (
f"CREATE KAFKA STREAM {stream_name} TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.simple BATCH_SIZE 1"
)
def message_sender(message):
kafka_producer.send(kafka_topics[0], message).get(timeout=6000)
common.test_check_while_stream_with_batch_limit_running(connection, stream_creator, message_sender)
def test_start_stream_with_batch_limit_with_invalid_batch_limit(kafka_producer, kafka_topics, connection):
assert len(kafka_topics) > 0
def stream_creator(stream_name):
return (
f"CREATE KAFKA STREAM {stream_name} TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.simple BATCH_SIZE 1"
)
common.test_start_stream_with_batch_limit_with_invalid_batch_limit(connection, stream_creator)
def test_check_stream_with_batch_limit_with_invalid_batch_limit(kafka_producer, kafka_topics, connection):
assert len(kafka_topics) > 0
def stream_creator(stream_name):
return (
f"CREATE KAFKA STREAM {stream_name} TOPICS {kafka_topics[0]} TRANSFORM kafka_transform.simple BATCH_SIZE 1"
)
common.test_check_stream_with_batch_limit_with_invalid_batch_limit(connection, stream_creator)
if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"]))

View File

@ -344,6 +344,73 @@ def test_service_url(pulsar_client, pulsar_topics, connection, transformation):
check_vertex_exists_with_topic_and_payload(cursor, topic, common.SIMPLE_MSG)
def test_start_stream_with_batch_limit(pulsar_client, pulsar_topics, connection):
assert len(pulsar_topics) > 1
def stream_creator(stream_name):
return f"CREATE PULSAR STREAM {stream_name} TOPICS {pulsar_topics[0]} TRANSFORM pulsar_transform.simple BATCH_SIZE 1"
producer = pulsar_client.create_producer(
common.pulsar_default_namespace_topic(pulsar_topics[0]), send_timeout_millis=60000
)
def messages_sender(nof_messages):
for x in range(nof_messages):
producer.send(common.SIMPLE_MSG)
common.test_start_stream_with_batch_limit(connection, stream_creator, messages_sender)
def test_start_stream_with_batch_limit_timeout(pulsar_client, pulsar_topics, connection):
assert len(pulsar_topics) > 1
def stream_creator(stream_name):
return f"CREATE PULSAR STREAM {stream_name} TOPICS {pulsar_topics[0]} TRANSFORM pulsar_transform.simple BATCH_SIZE 1"
common.test_start_stream_with_batch_limit_timeout(connection, stream_creator)
def test_start_stream_with_batch_limit_reaching_timeout(pulsar_client, pulsar_topics, connection):
assert len(pulsar_topics) > 1
def stream_creator(stream_name, batch_size):
return f"CREATE PULSAR STREAM {stream_name} TOPICS {pulsar_topics[0]} TRANSFORM pulsar_transform.simple BATCH_SIZE {batch_size}"
common.test_start_stream_with_batch_limit_reaching_timeout(connection, stream_creator)
def test_start_stream_with_batch_limit_while_check_running(pulsar_client, pulsar_topics, connection):
assert len(pulsar_topics) > 0
def stream_creator(stream_name):
return f"CREATE PULSAR STREAM {stream_name} TOPICS {pulsar_topics[0]} TRANSFORM pulsar_transform.simple BATCH_SIZE 1"
producer = pulsar_client.create_producer(
common.pulsar_default_namespace_topic(pulsar_topics[0]), send_timeout_millis=60000
)
def message_sender(message):
producer.send(message)
common.test_start_stream_with_batch_limit_while_check_running(connection, stream_creator, message_sender)
def test_check_while_stream_with_batch_limit_running(pulsar_client, pulsar_topics, connection):
assert len(pulsar_topics) > 0
def stream_creator(stream_name):
return f"CREATE PULSAR STREAM {stream_name} TOPICS {pulsar_topics[0]} TRANSFORM pulsar_transform.simple BATCH_SIZE 1"
producer = pulsar_client.create_producer(
common.pulsar_default_namespace_topic(pulsar_topics[0]), send_timeout_millis=60000
)
def message_sender(message):
producer.send(message)
common.test_check_while_stream_with_batch_limit_running(connection, stream_creator, message_sender)
def test_check_stream_same_number_of_queries_than_messages(pulsar_client, pulsar_topics, connection):
assert len(pulsar_topics) > 0
@ -380,5 +447,23 @@ def test_check_stream_different_number_of_queries_than_messages(pulsar_client, p
common.test_check_stream_different_number_of_queries_than_messages(connection, stream_creator, message_sender)
def test_start_stream_with_batch_limit_with_invalid_batch_limit(pulsar_client, pulsar_topics, connection):
assert len(pulsar_topics) > 0
def stream_creator(stream_name):
return f"CREATE PULSAR STREAM {stream_name} TOPICS {pulsar_topics[0]} TRANSFORM pulsar_transform.simple BATCH_SIZE 1"
common.test_start_stream_with_batch_limit_with_invalid_batch_limit(connection, stream_creator)
def test_check_stream_with_batch_limit_with_invalid_batch_limit(pulsar_client, pulsar_topics, connection):
assert len(pulsar_topics) > 0
def stream_creator(stream_name):
return f"CREATE PULSAR STREAM {stream_name} TOPICS {pulsar_topics[0]} TRANSFORM pulsar_transform.simple BATCH_SIZE 1"
common.test_check_stream_with_batch_limit_with_invalid_batch_limit(connection, stream_creator)
if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"]))

View File

@ -149,7 +149,7 @@ TEST_F(ConsumerTest, BatchInterval) {
}
consumer->Stop();
EXPECT_TRUE(expected_messages_received) << "Some unexpected message have been received";
EXPECT_TRUE(expected_messages_received) << "Some unexpected message has been received";
auto check_received_timestamp = [&received_timestamps](size_t index) {
SCOPED_TRACE("Checking index " + std::to_string(index));
@ -178,14 +178,6 @@ TEST_F(ConsumerTest, BatchInterval) {
TEST_F(ConsumerTest, StartStop) {
Consumer consumer{CreateDefaultConsumerInfo(), kDummyConsumerFunction};
auto start = [&consumer](const bool use_conditional) {
if (use_conditional) {
consumer.StartIfStopped();
} else {
consumer.Start();
}
};
auto stop = [&consumer](const bool use_conditional) {
if (use_conditional) {
consumer.StopIfRunning();
@ -194,34 +186,28 @@ TEST_F(ConsumerTest, StartStop) {
}
};
auto check_config = [&start, &stop, &consumer](const bool use_conditional_start,
const bool use_conditional_stop) mutable {
SCOPED_TRACE(
fmt::format("Conditional start {} and conditional stop {}", use_conditional_start, use_conditional_stop));
auto check_config = [&stop, &consumer](const bool use_conditional_stop) mutable {
SCOPED_TRACE(fmt::format("Start and conditionally stop {}", use_conditional_stop));
EXPECT_FALSE(consumer.IsRunning());
EXPECT_THROW(consumer.Stop(), ConsumerStoppedException);
consumer.StopIfRunning();
EXPECT_FALSE(consumer.IsRunning());
start(use_conditional_start);
consumer.Start();
EXPECT_TRUE(consumer.IsRunning());
EXPECT_THROW(consumer.Start(), ConsumerRunningException);
consumer.StartIfStopped();
EXPECT_TRUE(consumer.IsRunning());
stop(use_conditional_stop);
EXPECT_FALSE(consumer.IsRunning());
};
static constexpr auto kSimpleStart = false;
static constexpr auto kSimpleStop = false;
static constexpr auto kConditionalStart = true;
static constexpr auto kConditionalStop = true;
check_config(kSimpleStart, kSimpleStop);
check_config(kSimpleStart, kConditionalStop);
check_config(kConditionalStart, kSimpleStop);
check_config(kConditionalStart, kConditionalStop);
check_config(kSimpleStop);
check_config(kConditionalStop);
}
TEST_F(ConsumerTest, BatchSize) {
@ -252,7 +238,7 @@ TEST_F(ConsumerTest, BatchSize) {
}
std::this_thread::sleep_for(kBatchInterval * 2);
consumer->Stop();
EXPECT_TRUE(expected_messages_received) << "Some unexpected message have been received";
EXPECT_TRUE(expected_messages_received) << "Some unexpected message has been received";
auto check_received_timestamp = [&received_timestamps](size_t index, size_t expected_message_count) {
SCOPED_TRACE("Checking index " + std::to_string(index));
@ -371,7 +357,7 @@ TEST_F(ConsumerTest, DISABLED_StartsFromPreviousOffset) {
EXPECT_EQ(expected_total_messages, received_message_count);
EXPECT_NO_THROW(consumer->Stop());
ASSERT_FALSE(consumer->IsRunning());
EXPECT_TRUE(expected_messages_received) << "Some unexpected message have been received";
EXPECT_TRUE(expected_messages_received) << "Some unexpected message has been received";
};
ASSERT_NO_FATAL_FAILURE(send_and_consume_messages(2));
@ -383,10 +369,9 @@ TEST_F(ConsumerTest, CheckMethodWorks) {
auto info = CreateDefaultConsumerInfo();
info.batch_size = kBatchSize;
const std::string kMessagePrefix{"Message"};
auto consumer_function = [](const std::vector<Message> &messages) mutable {};
// This test depends on CreateConsumer starts and stops the consumer, so the offset is stored
auto consumer = CreateConsumer(std::move(info), std::move(consumer_function));
auto consumer = CreateConsumer(std::move(info), kDummyConsumerFunction);
static constexpr auto kMessageCount = 4;
for (auto sent_messages = 0; sent_messages < kMessageCount; ++sent_messages) {
@ -411,7 +396,7 @@ TEST_F(ConsumerTest, CheckMethodWorks) {
});
ASSERT_FALSE(consumer->IsRunning());
EXPECT_TRUE(expected_messages_received) << "Some unexpected message have been received";
EXPECT_TRUE(expected_messages_received) << "Some unexpected message has been received";
EXPECT_EQ(received_message_count, kMessageCount);
};
@ -445,8 +430,6 @@ TEST_F(ConsumerTest, CheckWithInvalidTimeout) {
const auto start = std::chrono::steady_clock::now();
EXPECT_THROW(consumer.Check(std::chrono::milliseconds{0}, std::nullopt, kDummyConsumerFunction),
ConsumerCheckFailedException);
EXPECT_THROW(consumer.Check(std::chrono::milliseconds{-1}, std::nullopt, kDummyConsumerFunction),
ConsumerCheckFailedException);
const auto end = std::chrono::steady_clock::now();
static constexpr std::chrono::seconds kMaxExpectedTimeout{2};
@ -459,7 +442,6 @@ TEST_F(ConsumerTest, CheckWithInvalidBatchSize) {
const auto start = std::chrono::steady_clock::now();
EXPECT_THROW(consumer.Check(std::nullopt, 0, kDummyConsumerFunction), ConsumerCheckFailedException);
EXPECT_THROW(consumer.Check(std::nullopt, -1, kDummyConsumerFunction), ConsumerCheckFailedException);
const auto end = std::chrono::steady_clock::now();
static constexpr std::chrono::seconds kMaxExpectedTimeout{2};
@ -496,8 +478,85 @@ TEST_F(ConsumerTest, ConsumerStatus) {
check_info(consumer.Info());
consumer.Start();
check_info(consumer.Info());
consumer.StartIfStopped();
check_info(consumer.Info());
consumer.StopIfRunning();
check_info(consumer.Info());
}
TEST_F(ConsumerTest, LimitBatches_CannotStartIfAlreadyRunning) {
static constexpr auto kLimitBatches = 3;
auto info = CreateDefaultConsumerInfo();
auto consumer = CreateConsumer(std::move(info), kDummyConsumerFunction);
consumer->Start();
ASSERT_TRUE(consumer->IsRunning());
EXPECT_THROW(consumer->StartWithLimit(kLimitBatches, std::nullopt /*timeout*/), ConsumerRunningException);
EXPECT_TRUE(consumer->IsRunning());
consumer->Stop();
EXPECT_FALSE(consumer->IsRunning());
}
TEST_F(ConsumerTest, LimitBatches_SendingMoreThanLimit) {
/*
We send more messages than the BatchSize*LimitBatches:
-Consumer should receive 2*3=6 messages.
-Consumer should not be running afterwards.
*/
static constexpr auto kBatchSize = 2;
static constexpr auto kLimitBatches = 3;
static constexpr auto kNumberOfMessagesToSend = 20;
static constexpr auto kNumberOfMessagesExpected = kBatchSize * kLimitBatches;
static constexpr auto kBatchInterval =
std::chrono::seconds{2}; // We do not want the batch interval to be the limiting factor here.
auto info = CreateDefaultConsumerInfo();
info.batch_size = kBatchSize;
info.batch_interval = kBatchInterval;
static constexpr std::string_view kMessage = "LimitBatchesTestMessage";
auto expected_messages_received = true;
auto number_of_messages_received = 0;
auto consumer_function = [&expected_messages_received,
&number_of_messages_received](const std::vector<Message> &messages) mutable {
number_of_messages_received += messages.size();
for (const auto &message : messages) {
expected_messages_received &= (kMessage == std::string_view(message.Payload().data(), message.Payload().size()));
}
};
auto consumer = CreateConsumer(std::move(info), consumer_function);
for (auto sent_messages = 0; sent_messages <= kNumberOfMessagesToSend; ++sent_messages) {
cluster.SeedTopic(kTopicName, kMessage);
}
consumer->StartWithLimit(kLimitBatches, kDontCareTimeout);
EXPECT_FALSE(consumer->IsRunning());
EXPECT_EQ(number_of_messages_received, kNumberOfMessagesExpected);
EXPECT_TRUE(expected_messages_received) << "Some unexpected message has been received";
}
TEST_F(ConsumerTest, LimitBatches_Timeout_Reached) {
// We do not send any messages, we expect an exeption to be thrown.
static constexpr auto kLimitBatches = 3;
auto info = CreateDefaultConsumerInfo();
auto consumer = CreateConsumer(std::move(info), kDummyConsumerFunction);
std::chrono::milliseconds timeout{3000};
const auto start = std::chrono::steady_clock::now();
EXPECT_THROW(consumer->StartWithLimit(kLimitBatches, timeout), ConsumerStartFailedException);
const auto end = std::chrono::steady_clock::now();
const auto elapsed = (end - start);
EXPECT_LE(timeout, elapsed);
EXPECT_LE(elapsed, timeout * 1.2);
}

View File

@ -32,6 +32,13 @@ class ReplicationTest : public ::testing::Test {
void TearDown() override { Clear(); }
memgraph::storage::Config configuration{
.items = {.properties_on_edges = true},
.durability = {
.storage_directory = storage_directory,
.snapshot_wal_mode = memgraph::storage::Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
}};
private:
void Clear() {
if (!std::filesystem::exists(storage_directory)) return;
@ -40,19 +47,9 @@ class ReplicationTest : public ::testing::Test {
};
TEST_F(ReplicationTest, BasicSynchronousReplicationTest) {
memgraph::storage::Storage main_store(
{.items = {.properties_on_edges = true},
.durability = {
.storage_directory = storage_directory,
.snapshot_wal_mode = memgraph::storage::Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
}});
memgraph::storage::Storage main_store(configuration);
memgraph::storage::Storage replica_store(
{.items = {.properties_on_edges = true},
.durability = {
.storage_directory = storage_directory,
.snapshot_wal_mode = memgraph::storage::Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
}});
memgraph::storage::Storage replica_store(configuration);
replica_store.SetReplicaRole(memgraph::io::network::Endpoint{"127.0.0.1", 10000});
ASSERT_FALSE(main_store
@ -483,19 +480,9 @@ TEST_F(ReplicationTest, RecoveryProcess) {
}
TEST_F(ReplicationTest, BasicAsynchronousReplicationTest) {
memgraph::storage::Storage main_store(
{.items = {.properties_on_edges = true},
.durability = {
.storage_directory = storage_directory,
.snapshot_wal_mode = memgraph::storage::Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
}});
memgraph::storage::Storage main_store(configuration);
memgraph::storage::Storage replica_store_async(
{.items = {.properties_on_edges = true},
.durability = {
.storage_directory = storage_directory,
.snapshot_wal_mode = memgraph::storage::Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
}});
memgraph::storage::Storage replica_store_async(configuration);
replica_store_async.SetReplicaRole(memgraph::io::network::Endpoint{"127.0.0.1", 20000});
@ -533,28 +520,13 @@ TEST_F(ReplicationTest, BasicAsynchronousReplicationTest) {
}
TEST_F(ReplicationTest, EpochTest) {
memgraph::storage::Storage main_store(
{.items = {.properties_on_edges = true},
.durability = {
.storage_directory = storage_directory,
.snapshot_wal_mode = memgraph::storage::Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
}});
memgraph::storage::Storage main_store(configuration);
memgraph::storage::Storage replica_store1(
{.items = {.properties_on_edges = true},
.durability = {
.storage_directory = storage_directory,
.snapshot_wal_mode = memgraph::storage::Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
}});
memgraph::storage::Storage replica_store1(configuration);
replica_store1.SetReplicaRole(memgraph::io::network::Endpoint{"127.0.0.1", 10000});
memgraph::storage::Storage replica_store2(
{.items = {.properties_on_edges = true},
.durability = {
.storage_directory = storage_directory,
.snapshot_wal_mode = memgraph::storage::Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
}});
memgraph::storage::Storage replica_store2(configuration);
replica_store2.SetReplicaRole(memgraph::io::network::Endpoint{"127.0.0.1", 10001});
@ -639,30 +611,15 @@ TEST_F(ReplicationTest, EpochTest) {
}
TEST_F(ReplicationTest, ReplicationInformation) {
memgraph::storage::Storage main_store(
{.items = {.properties_on_edges = true},
.durability = {
.storage_directory = storage_directory,
.snapshot_wal_mode = memgraph::storage::Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
}});
memgraph::storage::Storage main_store(configuration);
memgraph::storage::Storage replica_store1(
{.items = {.properties_on_edges = true},
.durability = {
.storage_directory = storage_directory,
.snapshot_wal_mode = memgraph::storage::Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
}});
memgraph::storage::Storage replica_store1(configuration);
const memgraph::io::network::Endpoint replica1_endpoint{"127.0.0.1", 10000};
const memgraph::io::network::Endpoint replica1_endpoint{"127.0.0.1", 10001};
replica_store1.SetReplicaRole(replica1_endpoint);
const memgraph::io::network::Endpoint replica2_endpoint{"127.0.0.1", 10000};
memgraph::storage::Storage replica_store2(
{.items = {.properties_on_edges = true},
.durability = {
.storage_directory = storage_directory,
.snapshot_wal_mode = memgraph::storage::Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL,
}});
const memgraph::io::network::Endpoint replica2_endpoint{"127.0.0.1", 10002};
memgraph::storage::Storage replica_store2(configuration);
replica_store2.SetReplicaRole(replica2_endpoint);
@ -700,3 +657,55 @@ TEST_F(ReplicationTest, ReplicationInformation) {
ASSERT_EQ(second_info.endpoint, replica2_endpoint);
ASSERT_EQ(second_info.state, memgraph::storage::replication::ReplicaState::READY);
}
TEST_F(ReplicationTest, ReplicationReplicaWithExistingName) {
memgraph::storage::Storage main_store(configuration);
memgraph::storage::Storage replica_store1(configuration);
const memgraph::io::network::Endpoint replica1_endpoint{"127.0.0.1", 10001};
replica_store1.SetReplicaRole(replica1_endpoint);
const memgraph::io::network::Endpoint replica2_endpoint{"127.0.0.1", 10002};
memgraph::storage::Storage replica_store2(configuration);
replica_store2.SetReplicaRole(replica2_endpoint);
const std::string replica1_name{"REPLICA1"};
ASSERT_FALSE(main_store
.RegisterReplica(replica1_name, replica1_endpoint,
memgraph::storage::replication::ReplicationMode::SYNC, {.timeout = 2.0})
.HasError());
const std::string replica2_name{"REPLICA1"};
ASSERT_TRUE(
main_store
.RegisterReplica(replica2_name, replica2_endpoint, memgraph::storage::replication::ReplicationMode::ASYNC)
.GetError() == memgraph::storage::Storage::RegisterReplicaError::NAME_EXISTS);
}
TEST_F(ReplicationTest, ReplicationReplicaWithExistingEndPoint) {
memgraph::storage::Storage main_store(configuration);
memgraph::storage::Storage replica_store1(configuration);
const memgraph::io::network::Endpoint replica1_endpoint{"127.0.0.1", 10001};
replica_store1.SetReplicaRole(replica1_endpoint);
const memgraph::io::network::Endpoint replica2_endpoint{"127.0.0.1", 10001};
memgraph::storage::Storage replica_store2(configuration);
replica_store2.SetReplicaRole(replica2_endpoint);
const std::string replica1_name{"REPLICA1"};
ASSERT_FALSE(main_store
.RegisterReplica(replica1_name, replica1_endpoint,
memgraph::storage::replication::ReplicationMode::SYNC, {.timeout = 2.0})
.HasError());
const std::string replica2_name{"REPLICA2"};
ASSERT_TRUE(
main_store
.RegisterReplica(replica2_name, replica2_endpoint, memgraph::storage::replication::ReplicationMode::ASYNC)
.GetError() == memgraph::storage::Storage::RegisterReplicaError::END_POINT_EXISTS);
}