From d4f0bb0e38f4b638afe6a982ce68067462e421c5 Mon Sep 17 00:00:00 2001 From: Jeremy B <97525434+42jeremy@users.noreply.github.com> Date: Tue, 9 Aug 2022 11:29:55 +0200 Subject: [PATCH] Correct inconsistencies w.r.t. sync replication (#435) Add a report for the case where a sync replica does not confirm within a timeout: -Add a new exception: ReplicationException to be returned when one sync replica does not confirm the reception of messages (new data, new constraint/index, or for triggers) -Update the logic to throw the ReplicationException when needed for insertion of new data, triggers, or creation of new constraint/index -Add end-to-end tests to cover the loss of connection with sync/async replicas when adding new data, adding new constraint/indexes, and triggers Add end-to-end tests to cover the creation and drop of indexes, existence constraints, and uniqueness constraints Improved tooling function mg_sleep_and_assert to also show the last result when duration is exceeded --- src/memgraph.cpp | 9 +- src/query/db_accessor.hpp | 2 +- src/query/exceptions.hpp | 7 + src/query/interpreter.cpp | 344 ++++-- src/query/metadata.cpp | 10 +- src/query/metadata.hpp | 8 +- .../v2/replication/replication_client.cpp | 13 +- .../v2/replication/replication_client.hpp | 5 +- .../v2/replication/replication_server.cpp | 28 +- src/storage/v2/storage.cpp | 204 +++- src/storage/v2/storage.hpp | 113 +- src/storage/v2/storage_error.hpp | 38 + tests/benchmark/expansion.cpp | 2 +- tests/benchmark/query/execution.cpp | 4 +- tests/concurrent/storage_indices.cpp | 5 +- tests/e2e/configuration/default_config.py | 1 - tests/e2e/mg_utils.py | 4 +- .../show_while_creating_invalid_state.py | 1078 ++++++++++++++++- tests/jepsen/src/jepsen/memgraph/bank.clj | 15 +- tests/jepsen/src/jepsen/memgraph/basic.clj | 12 +- tests/jepsen/src/jepsen/memgraph/client.clj | 8 +- tests/jepsen/src/jepsen/memgraph/core.clj | 2 - tests/jepsen/src/jepsen/memgraph/large.clj | 21 +- .../jepsen/src/jepsen/memgraph/sequential.clj | 154 --- tests/jepsen/src/jepsen/memgraph/support.clj | 3 +- tests/unit/query_cost_estimator.cpp | 4 +- tests/unit/query_dump.cpp | 24 +- ...query_plan_v2_create_set_remove_delete.cpp | 2 +- tests/unit/storage_v2_constraints.cpp | 76 +- tests/unit/storage_v2_durability.cpp | 16 +- tests/unit/storage_v2_gc.cpp | 2 +- tests/unit/storage_v2_indices.cpp | 52 +- tests/unit/storage_v2_replication.cpp | 20 +- 33 files changed, 1754 insertions(+), 532 deletions(-) create mode 100644 src/storage/v2/storage_error.hpp delete mode 100644 tests/jepsen/src/jepsen/memgraph/sequential.clj diff --git a/src/memgraph.cpp b/src/memgraph.cpp index fb38289c6..afcabf99b 100644 --- a/src/memgraph.cpp +++ b/src/memgraph.cpp @@ -214,11 +214,6 @@ DEFINE_bool(telemetry_enabled, false, "the database runtime (vertex and edge counts and resource usage) " "to allow for easier improvement of the product."); -// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_bool(storage_restore_replicas_on_startup, true, - "Controls replicas should be restored automatically."); // TODO(42jeremy) this must be removed once T0835 - // is implemented. - // Streams flags // NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables) DEFINE_uint32( @@ -905,6 +900,8 @@ class BoltSession final : public memgraph::communication::bolt::SessionAdvanceCommand(); } - utils::BasicResult Commit() { return accessor_->Commit(); } + utils::BasicResult Commit() { return accessor_->Commit(); } void Abort() { accessor_->Abort(); } diff --git a/src/query/exceptions.hpp b/src/query/exceptions.hpp index b8a44d85d..ce2904882 100644 --- a/src/query/exceptions.hpp +++ b/src/query/exceptions.hpp @@ -230,4 +230,11 @@ class VersionInfoInMulticommandTxException : public QueryException { : QueryException("Version info query not allowed in multicommand transactions.") {} }; +class ReplicationException : public utils::BasicException { + public: + using utils::BasicException::BasicException; + explicit ReplicationException(const std::string &message) + : utils::BasicException("Replication Exception: {} Check the status of the replicas using 'SHOW REPLICA' query.", + message) {} +}; } // namespace memgraph::query diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index f0fe0d925..9bcb16535 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include "glue/communication.hpp" #include "memory/memory_control.hpp" @@ -75,6 +76,9 @@ extern const Event TriggersCreated; namespace memgraph::query { +template +constexpr auto kAlwaysFalse = false; + namespace { void UpdateTypeCount(const plan::ReadWriteTypeChecker::RWType type) { switch (type) { @@ -1383,23 +1387,34 @@ PreparedQuery PrepareIndexQuery(ParsedQuery parsed_query, bool in_explicit_trans handler = [interpreter_context, label, properties_stringified = std::move(properties_stringified), label_name = index_query->label_.name, properties = std::move(properties), invalidate_plan_cache = std::move(invalidate_plan_cache)](Notification &index_notification) { - if (properties.empty()) { - if (!interpreter_context->db->CreateIndex(label)) { - index_notification.code = NotificationCode::EXISTANT_INDEX; - index_notification.title = - fmt::format("Index on label {} on properties {} already exists.", label_name, properties_stringified); - } - EventCounter::IncrementCounter(EventCounter::LabelIndexCreated); + MG_ASSERT(properties.size() <= 1U); + auto maybe_index_error = properties.empty() ? interpreter_context->db->CreateIndex(label) + : interpreter_context->db->CreateIndex(label, properties[0]); + utils::OnScopeExit invalidator(invalidate_plan_cache); + + if (maybe_index_error.HasError()) { + const auto &error = maybe_index_error.GetError(); + std::visit( + [&index_notification, &label_name, &properties_stringified](T &&) { + using ErrorType = std::remove_cvref_t; + if constexpr (std::is_same_v) { + EventCounter::IncrementCounter(EventCounter::LabelIndexCreated); + throw ReplicationException( + fmt::format("At least one SYNC replica has not confirmed the creation of the index on label {} " + "on properties {}.", + label_name, properties_stringified)); + } else if constexpr (std::is_same_v) { + index_notification.code = NotificationCode::EXISTENT_INDEX; + index_notification.title = fmt::format("Index on label {} on properties {} already exists.", + label_name, properties_stringified); + } else { + static_assert(kAlwaysFalse, "Missing type from variant visitor"); + } + }, + error); } else { - MG_ASSERT(properties.size() == 1U); - if (!interpreter_context->db->CreateIndex(label, properties[0])) { - index_notification.code = NotificationCode::EXISTANT_INDEX; - index_notification.title = - fmt::format("Index on label {} on properties {} already exists.", label_name, properties_stringified); - } - EventCounter::IncrementCounter(EventCounter::LabelPropertyIndexCreated); + EventCounter::IncrementCounter(EventCounter::LabelIndexCreated); } - invalidate_plan_cache(); }; break; } @@ -1410,21 +1425,31 @@ PreparedQuery PrepareIndexQuery(ParsedQuery parsed_query, bool in_explicit_trans handler = [interpreter_context, label, properties_stringified = std::move(properties_stringified), label_name = index_query->label_.name, properties = std::move(properties), invalidate_plan_cache = std::move(invalidate_plan_cache)](Notification &index_notification) { - if (properties.empty()) { - if (!interpreter_context->db->DropIndex(label)) { - index_notification.code = NotificationCode::NONEXISTANT_INDEX; - index_notification.title = - fmt::format("Index on label {} on properties {} doesn't exist.", label_name, properties_stringified); - } - } else { - MG_ASSERT(properties.size() == 1U); - if (!interpreter_context->db->DropIndex(label, properties[0])) { - index_notification.code = NotificationCode::NONEXISTANT_INDEX; - index_notification.title = - fmt::format("Index on label {} on properties {} doesn't exist.", label_name, properties_stringified); - } + MG_ASSERT(properties.size() <= 1U); + auto maybe_index_error = properties.empty() ? interpreter_context->db->DropIndex(label) + : interpreter_context->db->DropIndex(label, properties[0]); + utils::OnScopeExit invalidator(invalidate_plan_cache); + + if (maybe_index_error.HasError()) { + const auto &error = maybe_index_error.GetError(); + std::visit( + [&index_notification, &label_name, &properties_stringified](T &&) { + using ErrorType = std::remove_cvref_t; + if constexpr (std::is_same_v) { + throw ReplicationException( + fmt::format("At least one SYNC replica has not confirmed the dropping of the index on label {} " + "on properties {}.", + label_name, properties_stringified)); + } else if constexpr (std::is_same_v) { + index_notification.code = NotificationCode::NONEXISTENT_INDEX; + index_notification.title = fmt::format("Index on label {} on properties {} doesn't exist.", + label_name, properties_stringified); + } else { + static_assert(kAlwaysFalse, "Missing type from variant visitor"); + } + }, + error); } - invalidate_plan_cache(); }; break; } @@ -1973,21 +1998,37 @@ PreparedQuery PrepareConstraintQuery(ParsedQuery parsed_query, bool in_explicit_ handler = [interpreter_context, label, label_name = constraint_query->constraint_.label.name, properties_stringified = std::move(properties_stringified), properties = std::move(properties)](Notification &constraint_notification) { - auto res = interpreter_context->db->CreateExistenceConstraint(label, properties[0]); - if (res.HasError()) { - auto violation = res.GetError(); - auto label_name = interpreter_context->db->LabelToName(violation.label); - MG_ASSERT(violation.properties.size() == 1U); - auto property_name = interpreter_context->db->PropertyToName(*violation.properties.begin()); - throw QueryRuntimeException( - "Unable to create existence constraint :{}({}), because an " - "existing node violates it.", - label_name, property_name); - } - if (res.HasValue() && !res.GetValue()) { - constraint_notification.code = NotificationCode::EXISTANT_CONSTRAINT; - constraint_notification.title = fmt::format( - "Constraint EXISTS on label {} on properties {} already exists.", label_name, properties_stringified); + auto maybe_constraint_error = interpreter_context->db->CreateExistenceConstraint(label, properties[0]); + + if (maybe_constraint_error.HasError()) { + const auto &error = maybe_constraint_error.GetError(); + std::visit( + [&interpreter_context, &label_name, &properties_stringified, + &constraint_notification](T &&arg) { + using ErrorType = std::remove_cvref_t; + if constexpr (std::is_same_v) { + auto &violation = arg; + MG_ASSERT(violation.properties.size() == 1U); + auto property_name = interpreter_context->db->PropertyToName(*violation.properties.begin()); + throw QueryRuntimeException( + "Unable to create existence constraint :{}({}), because an " + "existing node violates it.", + label_name, property_name); + } else if constexpr (std::is_same_v) { + constraint_notification.code = NotificationCode::EXISTENT_CONSTRAINT; + constraint_notification.title = + fmt::format("Constraint EXISTS on label {} on properties {} already exists.", label_name, + properties_stringified); + } else if constexpr (std::is_same_v) { + throw ReplicationException( + "At least one SYNC replica has not confirmed the creation of the EXISTS constraint on label " + "{} on properties {}.", + label_name, properties_stringified); + } else { + static_assert(kAlwaysFalse, "Missing type from variant visitor"); + } + }, + error); } }; break; @@ -2005,21 +2046,35 @@ PreparedQuery PrepareConstraintQuery(ParsedQuery parsed_query, bool in_explicit_ handler = [interpreter_context, label, label_name = constraint_query->constraint_.label.name, properties_stringified = std::move(properties_stringified), property_set = std::move(property_set)](Notification &constraint_notification) { - auto res = interpreter_context->db->CreateUniqueConstraint(label, property_set); - if (res.HasError()) { - auto violation = res.GetError(); - auto label_name = interpreter_context->db->LabelToName(violation.label); - std::stringstream property_names_stream; - utils::PrintIterable(property_names_stream, violation.properties, ", ", - [&interpreter_context](auto &stream, const auto &prop) { - stream << interpreter_context->db->PropertyToName(prop); - }); - throw QueryRuntimeException( - "Unable to create unique constraint :{}({}), because an " - "existing node violates it.", - label_name, property_names_stream.str()); + auto maybe_constraint_error = interpreter_context->db->CreateUniqueConstraint(label, property_set); + if (maybe_constraint_error.HasError()) { + const auto &error = maybe_constraint_error.GetError(); + std::visit( + [&interpreter_context, &label_name, &properties_stringified](T &&arg) { + using ErrorType = std::remove_cvref_t; + if constexpr (std::is_same_v) { + auto &violation = arg; + auto violation_label_name = interpreter_context->db->LabelToName(violation.label); + std::stringstream property_names_stream; + utils::PrintIterable(property_names_stream, violation.properties, ", ", + [&interpreter_context](auto &stream, const auto &prop) { + stream << interpreter_context->db->PropertyToName(prop); + }); + throw QueryRuntimeException( + "Unable to create unique constraint :{}({}), because an " + "existing node violates it.", + violation_label_name, property_names_stream.str()); + } else if constexpr (std::is_same_v) { + throw ReplicationException(fmt::format( + "At least one SYNC replica has not confirmed the creation of the UNIQUE constraint: {}({}).", + label_name, properties_stringified)); + } else { + static_assert(kAlwaysFalse, "Missing type from variant visitor"); + } + }, + error); } - switch (res.GetValue()) { + switch (maybe_constraint_error.GetValue()) { case storage::UniqueConstraints::CreationStatus::EMPTY_PROPERTIES: throw SyntaxException( "At least one property must be used for unique " @@ -2030,7 +2085,7 @@ PreparedQuery PrepareConstraintQuery(ParsedQuery parsed_query, bool in_explicit_ "for unique constraints is exceeded.", storage::kUniqueConstraintsMaxProperties); case storage::UniqueConstraints::CreationStatus::ALREADY_EXISTS: - constraint_notification.code = NotificationCode::EXISTANT_CONSTRAINT; + constraint_notification.code = NotificationCode::EXISTENT_CONSTRAINT; constraint_notification.title = fmt::format("Constraint UNIQUE on label {} on properties {} already exists.", label_name, properties_stringified); @@ -2058,10 +2113,27 @@ PreparedQuery PrepareConstraintQuery(ParsedQuery parsed_query, bool in_explicit_ handler = [interpreter_context, label, label_name = constraint_query->constraint_.label.name, properties_stringified = std::move(properties_stringified), properties = std::move(properties)](Notification &constraint_notification) { - if (!interpreter_context->db->DropExistenceConstraint(label, properties[0])) { - constraint_notification.code = NotificationCode::NONEXISTANT_CONSTRAINT; - constraint_notification.title = fmt::format( - "Constraint EXISTS on label {} on properties {} doesn't exist.", label_name, properties_stringified); + auto maybe_constraint_error = interpreter_context->db->DropExistenceConstraint(label, properties[0]); + if (maybe_constraint_error.HasError()) { + const auto &error = maybe_constraint_error.GetError(); + std::visit( + [&label_name, &properties_stringified, &constraint_notification](T &&) { + using ErrorType = std::remove_cvref_t; + if constexpr (std::is_same_v) { + constraint_notification.code = NotificationCode::NONEXISTENT_CONSTRAINT; + constraint_notification.title = + fmt::format("Constraint EXISTS on label {} on properties {} doesn't exist.", label_name, + properties_stringified); + } else if constexpr (std::is_same_v) { + throw ReplicationException( + fmt::format("At least one SYNC replica has not confirmed the dropping of the EXISTS " + "constraint on label {} on properties {}.", + label_name, properties_stringified)); + } else { + static_assert(kAlwaysFalse, "Missing type from variant visitor"); + } + }, + error); } return std::vector>(); }; @@ -2080,7 +2152,24 @@ PreparedQuery PrepareConstraintQuery(ParsedQuery parsed_query, bool in_explicit_ handler = [interpreter_context, label, label_name = constraint_query->constraint_.label.name, properties_stringified = std::move(properties_stringified), property_set = std::move(property_set)](Notification &constraint_notification) { - auto res = interpreter_context->db->DropUniqueConstraint(label, property_set); + auto maybe_constraint_error = interpreter_context->db->DropUniqueConstraint(label, property_set); + if (maybe_constraint_error.HasError()) { + const auto &error = maybe_constraint_error.GetError(); + std::visit( + [&label_name, &properties_stringified](T &&) { + using ErrorType = std::remove_cvref_t; + if constexpr (std::is_same_v) { + throw ReplicationException( + fmt::format("At least one SYNC replica has not confirmed the dropping of the UNIQUE " + "constraint on label {} on properties {}.", + label_name, properties_stringified)); + } else { + static_assert(kAlwaysFalse, "Missing type from variant visitor"); + } + }, + error); + } + const auto &res = maybe_constraint_error.GetValue(); switch (res) { case storage::UniqueConstraints::DeletionStatus::EMPTY_PROPERTIES: throw SyntaxException( @@ -2094,7 +2183,7 @@ PreparedQuery PrepareConstraintQuery(ParsedQuery parsed_query, bool in_explicit_ storage::kUniqueConstraintsMaxProperties); break; case storage::UniqueConstraints::DeletionStatus::NOT_FOUND: - constraint_notification.code = NotificationCode::NONEXISTANT_CONSTRAINT; + constraint_notification.code = NotificationCode::NONEXISTENT_CONSTRAINT; constraint_notification.title = fmt::format("Constraint UNIQUE on label {} on properties {} doesn't exist.", label_name, properties_stringified); @@ -2312,28 +2401,41 @@ void RunTriggersIndividually(const utils::SkipList &triggers, Interpret continue; } - auto maybe_constraint_violation = db_accessor.Commit(); - if (maybe_constraint_violation.HasError()) { - const auto &constraint_violation = maybe_constraint_violation.GetError(); - switch (constraint_violation.type) { - case storage::ConstraintViolation::Type::EXISTENCE: { - const auto &label_name = db_accessor.LabelToName(constraint_violation.label); - MG_ASSERT(constraint_violation.properties.size() == 1U); - const auto &property_name = db_accessor.PropertyToName(*constraint_violation.properties.begin()); - spdlog::warn("Trigger '{}' failed to commit due to existence constraint violation on :{}({})", trigger.Name(), - label_name, property_name); - break; - } - case storage::ConstraintViolation::Type::UNIQUE: { - const auto &label_name = db_accessor.LabelToName(constraint_violation.label); - std::stringstream property_names_stream; - utils::PrintIterable(property_names_stream, constraint_violation.properties, ", ", - [&](auto &stream, const auto &prop) { stream << db_accessor.PropertyToName(prop); }); - spdlog::warn("Trigger '{}' failed to commit due to unique constraint violation on :{}({})", trigger.Name(), - label_name, property_names_stream.str()); - break; - } - } + auto maybe_commit_error = db_accessor.Commit(); + if (maybe_commit_error.HasError()) { + const auto &error = maybe_commit_error.GetError(); + + std::visit( + [&trigger, &db_accessor](T &&arg) { + using ErrorType = std::remove_cvref_t; + if constexpr (std::is_same_v) { + spdlog::warn("At least one SYNC replica has not confirmed execution of the trigger '{}'.", + trigger.Name()); + } else if constexpr (std::is_same_v) { + const auto &constraint_violation = arg; + switch (constraint_violation.type) { + case storage::ConstraintViolation::Type::EXISTENCE: { + const auto &label_name = db_accessor.LabelToName(constraint_violation.label); + MG_ASSERT(constraint_violation.properties.size() == 1U); + const auto &property_name = db_accessor.PropertyToName(*constraint_violation.properties.begin()); + spdlog::warn("Trigger '{}' failed to commit due to existence constraint violation on: {}({}) ", + trigger.Name(), label_name, property_name); + } + case storage::ConstraintViolation::Type::UNIQUE: { + const auto &label_name = db_accessor.LabelToName(constraint_violation.label); + std::stringstream property_names_stream; + utils::PrintIterable( + property_names_stream, constraint_violation.properties, ", ", + [&](auto &stream, const auto &prop) { stream << db_accessor.PropertyToName(prop); }); + spdlog::warn("Trigger '{}' failed to commit due to unique constraint violation on :{}({})", + trigger.Name(), label_name, property_names_stream.str()); + } + } + } else { + static_assert(kAlwaysFalse, "Missing type from variant visitor"); + } + }, + error); } } } @@ -2374,32 +2476,45 @@ void Interpreter::Commit() { db_accessor_.reset(); trigger_context_collector_.reset(); }; + utils::OnScopeExit members_reseter(reset_necessary_members); - auto maybe_constraint_violation = db_accessor_->Commit(); - if (maybe_constraint_violation.HasError()) { - const auto &constraint_violation = maybe_constraint_violation.GetError(); - switch (constraint_violation.type) { - case storage::ConstraintViolation::Type::EXISTENCE: { - auto label_name = execution_db_accessor_->LabelToName(constraint_violation.label); - MG_ASSERT(constraint_violation.properties.size() == 1U); - auto property_name = execution_db_accessor_->PropertyToName(*constraint_violation.properties.begin()); - reset_necessary_members(); - throw QueryException("Unable to commit due to existence constraint violation on :{}({})", label_name, - property_name); - break; - } - case storage::ConstraintViolation::Type::UNIQUE: { - auto label_name = execution_db_accessor_->LabelToName(constraint_violation.label); - std::stringstream property_names_stream; - utils::PrintIterable( - property_names_stream, constraint_violation.properties, ", ", - [this](auto &stream, const auto &prop) { stream << execution_db_accessor_->PropertyToName(prop); }); - reset_necessary_members(); - throw QueryException("Unable to commit due to unique constraint violation on :{}({})", label_name, - property_names_stream.str()); - break; - } - } + auto commit_confirmed_by_all_sync_repplicas = true; + + auto maybe_commit_error = db_accessor_->Commit(); + if (maybe_commit_error.HasError()) { + const auto &error = maybe_commit_error.GetError(); + + std::visit( + [&execution_db_accessor = execution_db_accessor_, + &commit_confirmed_by_all_sync_repplicas](T &&arg) { + using ErrorType = std::remove_cvref_t; + if constexpr (std::is_same_v) { + commit_confirmed_by_all_sync_repplicas = false; + } else if constexpr (std::is_same_v) { + const auto &constraint_violation = arg; + auto &label_name = execution_db_accessor->LabelToName(constraint_violation.label); + switch (constraint_violation.type) { + case storage::ConstraintViolation::Type::EXISTENCE: { + MG_ASSERT(constraint_violation.properties.size() == 1U); + auto &property_name = execution_db_accessor->PropertyToName(*constraint_violation.properties.begin()); + throw QueryException("Unable to commit due to existence constraint violation on :{}({})", label_name, + property_name); + } + case storage::ConstraintViolation::Type::UNIQUE: { + std::stringstream property_names_stream; + utils::PrintIterable(property_names_stream, constraint_violation.properties, ", ", + [&execution_db_accessor](auto &stream, const auto &prop) { + stream << execution_db_accessor->PropertyToName(prop); + }); + throw QueryException("Unable to commit due to unique constraint violation on :{}({})", label_name, + property_names_stream.str()); + } + } + } else { + static_assert(kAlwaysFalse, "Missing type from variant visitor"); + } + }, + error); } // The ordered execution of after commit triggers is heavily depending on the exclusiveness of db_accessor_->Commit(): @@ -2418,9 +2533,10 @@ void Interpreter::Commit() { }); } - reset_necessary_members(); - SPDLOG_DEBUG("Finished committing the transaction"); + if (!commit_confirmed_by_all_sync_repplicas) { + throw ReplicationException("At least one SYNC replica has not confirmed committing last transaction."); + } } void Interpreter::AdvanceCommand() { diff --git a/src/query/metadata.cpp b/src/query/metadata.cpp index f4e8512fd..47b207bc0 100644 --- a/src/query/metadata.cpp +++ b/src/query/metadata.cpp @@ -52,15 +52,15 @@ constexpr std::string_view GetCodeString(const NotificationCode code) { return "DropStream"sv; case NotificationCode::DROP_TRIGGER: return "DropTrigger"sv; - case NotificationCode::EXISTANT_CONSTRAINT: + case NotificationCode::EXISTENT_CONSTRAINT: return "ConstraintAlreadyExists"sv; - case NotificationCode::EXISTANT_INDEX: + case NotificationCode::EXISTENT_INDEX: return "IndexAlreadyExists"sv; case NotificationCode::LOAD_CSV_TIP: return "LoadCSVTip"sv; - case NotificationCode::NONEXISTANT_INDEX: + case NotificationCode::NONEXISTENT_INDEX: return "IndexDoesNotExist"sv; - case NotificationCode::NONEXISTANT_CONSTRAINT: + case NotificationCode::NONEXISTENT_CONSTRAINT: return "ConstraintDoesNotExist"sv; case NotificationCode::REGISTER_REPLICA: return "RegisterReplica"sv; @@ -114,4 +114,4 @@ std::string ExecutionStatsKeyToString(const ExecutionStats::Key key) { } } -} // namespace memgraph::query \ No newline at end of file +} // namespace memgraph::query diff --git a/src/query/metadata.hpp b/src/query/metadata.hpp index 67f784fa8..9f72ea9de 100644 --- a/src/query/metadata.hpp +++ b/src/query/metadata.hpp @@ -34,11 +34,11 @@ enum class NotificationCode : uint8_t { DROP_REPLICA, DROP_STREAM, DROP_TRIGGER, - EXISTANT_INDEX, - EXISTANT_CONSTRAINT, + EXISTENT_INDEX, + EXISTENT_CONSTRAINT, LOAD_CSV_TIP, - NONEXISTANT_INDEX, - NONEXISTANT_CONSTRAINT, + NONEXISTENT_INDEX, + NONEXISTENT_CONSTRAINT, REPLICA_PORT_WARNING, REGISTER_REPLICA, SET_REPLICA, diff --git a/src/storage/v2/replication/replication_client.cpp b/src/storage/v2/replication/replication_client.cpp index 9b6401391..c55d68b75 100644 --- a/src/storage/v2/replication/replication_client.cpp +++ b/src/storage/v2/replication/replication_client.cpp @@ -222,23 +222,24 @@ void Storage::ReplicationClient::IfStreamingTransaction(const std::functionFinalizeTransactionReplicationInternal(); }); + thread_pool_.AddTask([this] { static_cast(this->FinalizeTransactionReplicationInternal()); }); + return true; } else { - FinalizeTransactionReplicationInternal(); + return FinalizeTransactionReplicationInternal(); } } -void Storage::ReplicationClient::FinalizeTransactionReplicationInternal() { +bool Storage::ReplicationClient::FinalizeTransactionReplicationInternal() { MG_ASSERT(replica_stream_, "Missing stream for transaction deltas"); try { auto response = replica_stream_->Finalize(); @@ -249,6 +250,7 @@ void Storage::ReplicationClient::FinalizeTransactionReplicationInternal() { thread_pool_.AddTask([&, this] { this->RecoverReplica(response.current_commit_timestamp); }); } else { replica_state_.store(replication::ReplicaState::READY); + return true; } } catch (const rpc::RpcFailedException &) { replica_stream_.reset(); @@ -258,6 +260,7 @@ void Storage::ReplicationClient::FinalizeTransactionReplicationInternal() { } HandleRpcFailure(); } + return false; } void Storage::ReplicationClient::RecoverReplica(uint64_t replica_commit) { diff --git a/src/storage/v2/replication/replication_client.hpp b/src/storage/v2/replication/replication_client.hpp index 71c674062..829f0ab60 100644 --- a/src/storage/v2/replication/replication_client.hpp +++ b/src/storage/v2/replication/replication_client.hpp @@ -103,7 +103,8 @@ class Storage::ReplicationClient { // StartTransactionReplication, stream is created. void IfStreamingTransaction(const std::function &callback); - void FinalizeTransactionReplication(); + // Return whether the transaction could be finalized on the replication client or not. + [[nodiscard]] bool FinalizeTransactionReplication(); // Transfer the snapshot file. // @param path Path of the snapshot file. @@ -125,7 +126,7 @@ class Storage::ReplicationClient { Storage::TimestampInfo GetTimestampInfo(); private: - void FinalizeTransactionReplicationInternal(); + [[nodiscard]] bool FinalizeTransactionReplicationInternal(); void RecoverReplica(uint64_t replica_commit); diff --git a/src/storage/v2/replication/replication_server.cpp b/src/storage/v2/replication/replication_server.cpp index fed501d6e..cdfd83886 100644 --- a/src/storage/v2/replication/replication_server.cpp +++ b/src/storage/v2/replication/replication_server.cpp @@ -495,14 +495,14 @@ uint64_t Storage::ReplicationServer::ReadAndApplyDelta(durability::BaseDecoder * spdlog::trace(" Create label index on :{}", delta.operation_label.label); // Need to send the timestamp if (commit_timestamp_and_accessor) throw utils::BasicException("Invalid transaction!"); - if (!storage_->CreateIndex(storage_->NameToLabel(delta.operation_label.label), timestamp)) + if (storage_->CreateIndex(storage_->NameToLabel(delta.operation_label.label), timestamp).HasError()) throw utils::BasicException("Invalid transaction!"); break; } case durability::WalDeltaData::Type::LABEL_INDEX_DROP: { spdlog::trace(" Drop label index on :{}", delta.operation_label.label); if (commit_timestamp_and_accessor) throw utils::BasicException("Invalid transaction!"); - if (!storage_->DropIndex(storage_->NameToLabel(delta.operation_label.label), timestamp)) + if (storage_->DropIndex(storage_->NameToLabel(delta.operation_label.label), timestamp).HasError()) throw utils::BasicException("Invalid transaction!"); break; } @@ -510,8 +510,10 @@ uint64_t Storage::ReplicationServer::ReadAndApplyDelta(durability::BaseDecoder * spdlog::trace(" Create label+property index on :{} ({})", delta.operation_label_property.label, delta.operation_label_property.property); if (commit_timestamp_and_accessor) throw utils::BasicException("Invalid transaction!"); - if (!storage_->CreateIndex(storage_->NameToLabel(delta.operation_label_property.label), - storage_->NameToProperty(delta.operation_label_property.property), timestamp)) + if (storage_ + ->CreateIndex(storage_->NameToLabel(delta.operation_label_property.label), + storage_->NameToProperty(delta.operation_label_property.property), timestamp) + .HasError()) throw utils::BasicException("Invalid transaction!"); break; } @@ -519,8 +521,10 @@ uint64_t Storage::ReplicationServer::ReadAndApplyDelta(durability::BaseDecoder * spdlog::trace(" Drop label+property index on :{} ({})", delta.operation_label_property.label, delta.operation_label_property.property); if (commit_timestamp_and_accessor) throw utils::BasicException("Invalid transaction!"); - if (!storage_->DropIndex(storage_->NameToLabel(delta.operation_label_property.label), - storage_->NameToProperty(delta.operation_label_property.property), timestamp)) + if (storage_ + ->DropIndex(storage_->NameToLabel(delta.operation_label_property.label), + storage_->NameToProperty(delta.operation_label_property.property), timestamp) + .HasError()) throw utils::BasicException("Invalid transaction!"); break; } @@ -531,16 +535,17 @@ uint64_t Storage::ReplicationServer::ReadAndApplyDelta(durability::BaseDecoder * auto ret = storage_->CreateExistenceConstraint( storage_->NameToLabel(delta.operation_label_property.label), storage_->NameToProperty(delta.operation_label_property.property), timestamp); - if (!ret.HasValue() || !ret.GetValue()) throw utils::BasicException("Invalid transaction!"); + if (ret.HasError()) throw utils::BasicException("Invalid transaction!"); break; } case durability::WalDeltaData::Type::EXISTENCE_CONSTRAINT_DROP: { spdlog::trace(" Drop existence constraint on :{} ({})", delta.operation_label_property.label, delta.operation_label_property.property); if (commit_timestamp_and_accessor) throw utils::BasicException("Invalid transaction!"); - if (!storage_->DropExistenceConstraint(storage_->NameToLabel(delta.operation_label_property.label), - storage_->NameToProperty(delta.operation_label_property.property), - timestamp)) + if (storage_ + ->DropExistenceConstraint(storage_->NameToLabel(delta.operation_label_property.label), + storage_->NameToProperty(delta.operation_label_property.property), timestamp) + .HasError()) throw utils::BasicException("Invalid transaction!"); break; } @@ -570,7 +575,8 @@ uint64_t Storage::ReplicationServer::ReadAndApplyDelta(durability::BaseDecoder * } auto ret = storage_->DropUniqueConstraint(storage_->NameToLabel(delta.operation_label_properties.label), properties, timestamp); - if (ret != UniqueConstraints::DeletionStatus::SUCCESS) throw utils::BasicException("Invalid transaction!"); + if (ret.HasError() || ret.GetValue() != UniqueConstraints::DeletionStatus::SUCCESS) + throw utils::BasicException("Invalid transaction!"); break; } } diff --git a/src/storage/v2/storage.cpp b/src/storage/v2/storage.cpp index cbc41da86..b51607b9e 100644 --- a/src/storage/v2/storage.cpp +++ b/src/storage/v2/storage.cpp @@ -46,6 +46,7 @@ #include "storage/v2/replication/replication_client.hpp" #include "storage/v2/replication/replication_server.hpp" #include "storage/v2/replication/rpc.hpp" +#include "storage/v2/storage_error.hpp" namespace memgraph::storage { @@ -846,11 +847,13 @@ EdgeTypeId Storage::Accessor::NameToEdgeType(const std::string_view name) { retu void Storage::Accessor::AdvanceCommand() { ++transaction_.command_id; } -utils::BasicResult Storage::Accessor::Commit( +utils::BasicResult Storage::Accessor::Commit( const std::optional desired_commit_timestamp) { MG_ASSERT(is_transaction_active_, "The transaction is already terminated!"); MG_ASSERT(!transaction_.must_abort, "The transaction can't be committed!"); + auto could_replicate_all_sync_replicas = true; + if (transaction_.deltas.empty()) { // We don't have to update the commit timestamp here because no one reads // it. @@ -869,7 +872,7 @@ utils::BasicResult Storage::Accessor::Commit( auto validation_result = ValidateExistenceConstraints(*prev.vertex, storage_->constraints_); if (validation_result) { Abort(); - return *validation_result; + return StorageDataManipulationError{*validation_result}; } } @@ -926,7 +929,7 @@ utils::BasicResult Storage::Accessor::Commit( // Replica can log only the write transaction received from Main // so the Wal files are consistent if (storage_->replication_role_ == ReplicationRole::MAIN || desired_commit_timestamp.has_value()) { - storage_->AppendToWal(transaction_, *commit_timestamp_); + could_replicate_all_sync_replicas = storage_->AppendToWalDataManipulation(transaction_, *commit_timestamp_); } // Take committed_transactions lock while holding the engine lock to @@ -954,11 +957,15 @@ utils::BasicResult Storage::Accessor::Commit( if (unique_constraint_violation) { Abort(); - return *unique_constraint_violation; + return StorageDataManipulationError{*unique_constraint_violation}; } } is_transaction_active_ = false; + if (!could_replicate_all_sync_replicas) { + return StorageDataManipulationError{ReplicationError{}}; + } + return {}; } @@ -1157,46 +1164,82 @@ EdgeTypeId Storage::NameToEdgeType(const std::string_view name) { return EdgeTypeId::FromUint(name_id_mapper_.NameToId(name)); } -bool Storage::CreateIndex(LabelId label, const std::optional desired_commit_timestamp) { +utils::BasicResult Storage::CreateIndex( + LabelId label, const std::optional desired_commit_timestamp) { std::unique_lock storage_guard(main_lock_); - if (!indices_.label_index.CreateIndex(label, vertices_.access())) return false; + if (!indices_.label_index.CreateIndex(label, vertices_.access())) { + return StorageIndexDefinitionError{IndexDefinitionError{}}; + } const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp); - AppendToWal(durability::StorageGlobalOperation::LABEL_INDEX_CREATE, label, {}, commit_timestamp); + const auto success = + AppendToWalDataDefinition(durability::StorageGlobalOperation::LABEL_INDEX_CREATE, label, {}, commit_timestamp); commit_log_->MarkFinished(commit_timestamp); last_commit_timestamp_ = commit_timestamp; - return true; + + if (success) { + return {}; + } + + return StorageIndexDefinitionError{ReplicationError{}}; } -bool Storage::CreateIndex(LabelId label, PropertyId property, const std::optional desired_commit_timestamp) { +utils::BasicResult Storage::CreateIndex( + LabelId label, PropertyId property, const std::optional desired_commit_timestamp) { std::unique_lock storage_guard(main_lock_); - if (!indices_.label_property_index.CreateIndex(label, property, vertices_.access())) return false; + if (!indices_.label_property_index.CreateIndex(label, property, vertices_.access())) { + return StorageIndexDefinitionError{IndexDefinitionError{}}; + } const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp); - AppendToWal(durability::StorageGlobalOperation::LABEL_PROPERTY_INDEX_CREATE, label, {property}, commit_timestamp); + auto success = AppendToWalDataDefinition(durability::StorageGlobalOperation::LABEL_PROPERTY_INDEX_CREATE, label, + {property}, commit_timestamp); commit_log_->MarkFinished(commit_timestamp); last_commit_timestamp_ = commit_timestamp; - return true; + + if (success) { + return {}; + } + + return StorageIndexDefinitionError{ReplicationError{}}; } -bool Storage::DropIndex(LabelId label, const std::optional desired_commit_timestamp) { +utils::BasicResult Storage::DropIndex( + LabelId label, const std::optional desired_commit_timestamp) { std::unique_lock storage_guard(main_lock_); - if (!indices_.label_index.DropIndex(label)) return false; + if (!indices_.label_index.DropIndex(label)) { + return StorageIndexDefinitionError{IndexDefinitionError{}}; + } const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp); - AppendToWal(durability::StorageGlobalOperation::LABEL_INDEX_DROP, label, {}, commit_timestamp); + auto success = + AppendToWalDataDefinition(durability::StorageGlobalOperation::LABEL_INDEX_DROP, label, {}, commit_timestamp); commit_log_->MarkFinished(commit_timestamp); last_commit_timestamp_ = commit_timestamp; - return true; + + if (success) { + return {}; + } + + return StorageIndexDefinitionError{ReplicationError{}}; } -bool Storage::DropIndex(LabelId label, PropertyId property, const std::optional desired_commit_timestamp) { +utils::BasicResult Storage::DropIndex( + LabelId label, PropertyId property, const std::optional desired_commit_timestamp) { std::unique_lock storage_guard(main_lock_); - if (!indices_.label_property_index.DropIndex(label, property)) return false; + if (!indices_.label_property_index.DropIndex(label, property)) { + return StorageIndexDefinitionError{IndexDefinitionError{}}; + } // For a description why using `timestamp_` is correct, see // `CreateIndex(LabelId label)`. const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp); - AppendToWal(durability::StorageGlobalOperation::LABEL_PROPERTY_INDEX_DROP, label, {property}, commit_timestamp); + auto success = AppendToWalDataDefinition(durability::StorageGlobalOperation::LABEL_PROPERTY_INDEX_DROP, label, + {property}, commit_timestamp); commit_log_->MarkFinished(commit_timestamp); last_commit_timestamp_ = commit_timestamp; - return true; + + if (success) { + return {}; + } + + return StorageIndexDefinitionError{ReplicationError{}}; } IndicesInfo Storage::ListAllIndices() const { @@ -1204,55 +1247,92 @@ IndicesInfo Storage::ListAllIndices() const { return {indices_.label_index.ListIndices(), indices_.label_property_index.ListIndices()}; } -utils::BasicResult Storage::CreateExistenceConstraint( +utils::BasicResult Storage::CreateExistenceConstraint( LabelId label, PropertyId property, const std::optional desired_commit_timestamp) { std::unique_lock storage_guard(main_lock_); auto ret = storage::CreateExistenceConstraint(&constraints_, label, property, vertices_.access()); - if (ret.HasError() || !ret.GetValue()) return ret; + if (ret.HasError()) { + return StorageExistenceConstraintDefinitionError{ret.GetError()}; + } + if (!ret.GetValue()) { + return StorageExistenceConstraintDefinitionError{ConstraintDefinitionError{}}; + } + const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp); - AppendToWal(durability::StorageGlobalOperation::EXISTENCE_CONSTRAINT_CREATE, label, {property}, commit_timestamp); + auto success = AppendToWalDataDefinition(durability::StorageGlobalOperation::EXISTENCE_CONSTRAINT_CREATE, label, + {property}, commit_timestamp); commit_log_->MarkFinished(commit_timestamp); last_commit_timestamp_ = commit_timestamp; - return true; + + if (success) { + return {}; + } + + return StorageExistenceConstraintDefinitionError{ReplicationError{}}; } -bool Storage::DropExistenceConstraint(LabelId label, PropertyId property, - const std::optional desired_commit_timestamp) { +utils::BasicResult Storage::DropExistenceConstraint( + LabelId label, PropertyId property, const std::optional desired_commit_timestamp) { std::unique_lock storage_guard(main_lock_); - if (!storage::DropExistenceConstraint(&constraints_, label, property)) return false; - const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp); - AppendToWal(durability::StorageGlobalOperation::EXISTENCE_CONSTRAINT_DROP, label, {property}, commit_timestamp); - commit_log_->MarkFinished(commit_timestamp); - last_commit_timestamp_ = commit_timestamp; - return true; -} - -utils::BasicResult Storage::CreateUniqueConstraint( - LabelId label, const std::set &properties, const std::optional desired_commit_timestamp) { - std::unique_lock storage_guard(main_lock_); - auto ret = constraints_.unique_constraints.CreateConstraint(label, properties, vertices_.access()); - if (ret.HasError() || ret.GetValue() != UniqueConstraints::CreationStatus::SUCCESS) { - return ret; + if (!storage::DropExistenceConstraint(&constraints_, label, property)) { + return StorageExistenceConstraintDroppingError{ConstraintDefinitionError{}}; } const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp); - AppendToWal(durability::StorageGlobalOperation::UNIQUE_CONSTRAINT_CREATE, label, properties, commit_timestamp); + auto success = AppendToWalDataDefinition(durability::StorageGlobalOperation::EXISTENCE_CONSTRAINT_DROP, label, + {property}, commit_timestamp); commit_log_->MarkFinished(commit_timestamp); last_commit_timestamp_ = commit_timestamp; - return UniqueConstraints::CreationStatus::SUCCESS; + + if (success) { + return {}; + } + + return StorageExistenceConstraintDroppingError{ReplicationError{}}; } -UniqueConstraints::DeletionStatus Storage::DropUniqueConstraint( - LabelId label, const std::set &properties, const std::optional desired_commit_timestamp) { +utils::BasicResult +Storage::CreateUniqueConstraint(LabelId label, const std::set &properties, + const std::optional desired_commit_timestamp) { + std::unique_lock storage_guard(main_lock_); + auto ret = constraints_.unique_constraints.CreateConstraint(label, properties, vertices_.access()); + if (ret.HasError()) { + return StorageUniqueConstraintDefinitionError{ret.GetError()}; + } + if (ret.GetValue() != UniqueConstraints::CreationStatus::SUCCESS) { + return ret.GetValue(); + } + const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp); + auto success = AppendToWalDataDefinition(durability::StorageGlobalOperation::UNIQUE_CONSTRAINT_CREATE, label, + properties, commit_timestamp); + commit_log_->MarkFinished(commit_timestamp); + last_commit_timestamp_ = commit_timestamp; + + if (success) { + return UniqueConstraints::CreationStatus::SUCCESS; + } + + return StorageUniqueConstraintDefinitionError{ReplicationError{}}; +} + +utils::BasicResult +Storage::DropUniqueConstraint(LabelId label, const std::set &properties, + const std::optional desired_commit_timestamp) { std::unique_lock storage_guard(main_lock_); auto ret = constraints_.unique_constraints.DropConstraint(label, properties); if (ret != UniqueConstraints::DeletionStatus::SUCCESS) { return ret; } const auto commit_timestamp = CommitTimestamp(desired_commit_timestamp); - AppendToWal(durability::StorageGlobalOperation::UNIQUE_CONSTRAINT_DROP, label, properties, commit_timestamp); + auto success = AppendToWalDataDefinition(durability::StorageGlobalOperation::UNIQUE_CONSTRAINT_DROP, label, + properties, commit_timestamp); commit_log_->MarkFinished(commit_timestamp); last_commit_timestamp_ = commit_timestamp; - return UniqueConstraints::DeletionStatus::SUCCESS; + + if (success) { + return UniqueConstraints::DeletionStatus::SUCCESS; + } + + return StorageUniqueConstraintDroppingError{ReplicationError{}}; } ConstraintsInfo Storage::ListAllConstraints() const { @@ -1605,8 +1685,10 @@ void Storage::FinalizeWalFile() { } } -void Storage::AppendToWal(const Transaction &transaction, uint64_t final_commit_timestamp) { - if (!InitializeWalFile()) return; +bool Storage::AppendToWalDataManipulation(const Transaction &transaction, uint64_t final_commit_timestamp) { + if (!InitializeWalFile()) { + return true; + } // Traverse deltas and append them to the WAL file. // A single transaction will always be contained in a single WAL file. auto current_commit_timestamp = transaction.commit_timestamp->load(std::memory_order_acquire); @@ -1775,17 +1857,28 @@ void Storage::AppendToWal(const Transaction &transaction, uint64_t final_commit_ FinalizeWalFile(); + auto finalized_on_all_replicas = true; replication_clients_.WithLock([&](auto &clients) { for (auto &client : clients) { client->IfStreamingTransaction([&](auto &stream) { stream.AppendTransactionEnd(final_commit_timestamp); }); - client->FinalizeTransactionReplication(); + const auto finalized = client->FinalizeTransactionReplication(); + + if (client->Mode() == replication::ReplicationMode::SYNC) { + finalized_on_all_replicas = finalized && finalized_on_all_replicas; + } } }); + + return finalized_on_all_replicas; } -void Storage::AppendToWal(durability::StorageGlobalOperation operation, LabelId label, - const std::set &properties, uint64_t final_commit_timestamp) { - if (!InitializeWalFile()) return; +bool Storage::AppendToWalDataDefinition(durability::StorageGlobalOperation operation, LabelId label, + const std::set &properties, uint64_t final_commit_timestamp) { + if (!InitializeWalFile()) { + return true; + } + + auto finalized_on_all_replicas = true; wal_file_->AppendOperation(operation, label, properties, final_commit_timestamp); { if (replication_role_.load() == ReplicationRole::MAIN) { @@ -1794,12 +1887,17 @@ void Storage::AppendToWal(durability::StorageGlobalOperation operation, LabelId client->StartTransactionReplication(wal_file_->SequenceNumber()); client->IfStreamingTransaction( [&](auto &stream) { stream.AppendOperation(operation, label, properties, final_commit_timestamp); }); - client->FinalizeTransactionReplication(); + + const auto finalized = client->FinalizeTransactionReplication(); + if (client->Mode() == replication::ReplicationMode::SYNC) { + finalized_on_all_replicas = finalized && finalized_on_all_replicas; + } } }); } } FinalizeWalFile(); + return finalized_on_all_replicas; } utils::BasicResult Storage::CreateSnapshot() { diff --git a/src/storage/v2/storage.hpp b/src/storage/v2/storage.hpp index 6aab1977f..0a6bf5c68 100644 --- a/src/storage/v2/storage.hpp +++ b/src/storage/v2/storage.hpp @@ -48,6 +48,7 @@ #include "storage/v2/replication/enums.hpp" #include "storage/v2/replication/rpc.hpp" #include "storage/v2/replication/serialization.hpp" +#include "storage/v2/storage_error.hpp" namespace memgraph::storage { @@ -309,11 +310,14 @@ class Storage final { void AdvanceCommand(); - /// Commit returns `ConstraintViolation` if the changes made by this - /// transaction violate an existence or unique constraint. In that case the - /// transaction is automatically aborted. Otherwise, void is returned. + /// Returns void if the transaction has been committed. + /// Returns `StorageDataManipulationError` if an error occures. Error can be: + /// * `ReplicationError`: there is at least one SYNC replica that has not confirmed receiving the transaction. + /// * `ConstraintViolation`: the changes made by this transaction violate an existence or unique constraint. In this + /// case the transaction is automatically aborted. /// @throw std::bad_alloc - utils::BasicResult Commit(std::optional desired_commit_timestamp = {}); + utils::BasicResult Commit( + std::optional desired_commit_timestamp = {}); /// @throw std::bad_alloc void Abort(); @@ -352,54 +356,83 @@ class Storage final { /// @throw std::bad_alloc if unable to insert a new mapping EdgeTypeId NameToEdgeType(std::string_view name); + /// Create an index. + /// Returns void if the index has been created. + /// Returns `StorageIndexDefinitionError` if an error occures. Error can be: + /// * `IndexDefinitionError`: the index already exists. + /// * `ReplicationError`: there is at least one SYNC replica that has not confirmed receiving the transaction. /// @throw std::bad_alloc - bool CreateIndex(LabelId label, std::optional desired_commit_timestamp = {}); + utils::BasicResult CreateIndex( + LabelId label, std::optional desired_commit_timestamp = {}); + /// Create an index. + /// Returns void if the index has been created. + /// Returns `StorageIndexDefinitionError` if an error occures. Error can be: + /// * `ReplicationError`: there is at least one SYNC replica that has not confirmed receiving the transaction. + /// * `IndexDefinitionError`: the index already exists. /// @throw std::bad_alloc - bool CreateIndex(LabelId label, PropertyId property, std::optional desired_commit_timestamp = {}); + utils::BasicResult CreateIndex( + LabelId label, PropertyId property, std::optional desired_commit_timestamp = {}); - bool DropIndex(LabelId label, std::optional desired_commit_timestamp = {}); + /// Drop an existing index. + /// Returns void if the index has been dropped. + /// Returns `StorageIndexDefinitionError` if an error occures. Error can be: + /// * `ReplicationError`: there is at least one SYNC replica that has not confirmed receiving the transaction. + /// * `IndexDefinitionError`: the index does not exist. + utils::BasicResult DropIndex( + LabelId label, std::optional desired_commit_timestamp = {}); - bool DropIndex(LabelId label, PropertyId property, std::optional desired_commit_timestamp = {}); + /// Drop an existing index. + /// Returns void if the index has been dropped. + /// Returns `StorageIndexDefinitionError` if an error occures. Error can be: + /// * `ReplicationError`: there is at least one SYNC replica that has not confirmed receiving the transaction. + /// * `IndexDefinitionError`: the index does not exist. + utils::BasicResult DropIndex( + LabelId label, PropertyId property, std::optional desired_commit_timestamp = {}); IndicesInfo ListAllIndices() const; - /// Creates an existence constraint. Returns true if the constraint was - /// successfuly added, false if it already exists and a `ConstraintViolation` - /// if there is an existing vertex violating the constraint. - /// + /// Returns void if the existence constraint has been created. + /// Returns `StorageExistenceConstraintDefinitionError` if an error occures. Error can be: + /// * `ReplicationError`: there is at least one SYNC replica that has not confirmed receiving the transaction. + /// * `ConstraintViolation`: there is already a vertex existing that would break this new constraint. + /// * `ConstraintDefinitionError`: the constraint already exists. /// @throw std::bad_alloc /// @throw std::length_error - utils::BasicResult CreateExistenceConstraint( + utils::BasicResult CreateExistenceConstraint( LabelId label, PropertyId property, std::optional desired_commit_timestamp = {}); - /// Removes an existence constraint. Returns true if the constraint was - /// removed, and false if it doesn't exist. - bool DropExistenceConstraint(LabelId label, PropertyId property, - std::optional desired_commit_timestamp = {}); + /// Drop an existing existence constraint. + /// Returns void if the existence constraint has been dropped. + /// Returns `StorageExistenceConstraintDroppingError` if an error occures. Error can be: + /// * `ReplicationError`: there is at least one SYNC replica that has not confirmed receiving the transaction. + /// * `ConstraintDefinitionError`: the constraint did not exists. + utils::BasicResult DropExistenceConstraint( + LabelId label, PropertyId property, std::optional desired_commit_timestamp = {}); - /// Creates a unique constraint. In the case of two vertices violating the - /// constraint, it returns `ConstraintViolation`. Otherwise returns a - /// `UniqueConstraints::CreationStatus` enum with the following possibilities: - /// * `SUCCESS` if the constraint was successfully created, - /// * `ALREADY_EXISTS` if the constraint already existed, - /// * `EMPTY_PROPERTIES` if the property set is empty, or - // * `PROPERTIES_SIZE_LIMIT_EXCEEDED` if the property set exceeds the - // limit of maximum number of properties. - /// + /// Create an unique constraint. + /// Returns `StorageUniqueConstraintDefinitionError` if an error occures. Error can be: + /// * `ReplicationError`: there is at least one SYNC replica that has not confirmed receiving the transaction. + /// * `ConstraintViolation`: there are already vertices violating the constraint. + /// Returns `UniqueConstraints::CreationStatus` otherwise. Value can be: + /// * `SUCCESS` if the constraint was successfully created, + /// * `ALREADY_EXISTS` if the constraint already existed, + /// * `EMPTY_PROPERTIES` if the property set is empty, or + /// * `PROPERTIES_SIZE_LIMIT_EXCEEDED` if the property set exceeds the limit of maximum number of properties. /// @throw std::bad_alloc - utils::BasicResult CreateUniqueConstraint( + utils::BasicResult CreateUniqueConstraint( LabelId label, const std::set &properties, std::optional desired_commit_timestamp = {}); - /// Removes a unique constraint. Returns `UniqueConstraints::DeletionStatus` - /// enum with the following possibilities: - /// * `SUCCESS` if constraint was successfully removed, - /// * `NOT_FOUND` if the specified constraint was not found, - /// * `EMPTY_PROPERTIES` if the property set is empty, or - /// * `PROPERTIES_SIZE_LIMIT_EXCEEDED` if the property set exceeds the - // limit of maximum number of properties. - UniqueConstraints::DeletionStatus DropUniqueConstraint(LabelId label, const std::set &properties, - std::optional desired_commit_timestamp = {}); + /// Removes an existing unique constraint. + /// Returns `StorageUniqueConstraintDroppingError` if an error occures. Error can be: + /// * `ReplicationError`: there is at least one SYNC replica that has not confirmed receiving the transaction. + /// Returns `UniqueConstraints::DeletionStatus` otherwise. Value can be: + /// * `SUCCESS` if constraint was successfully removed, + /// * `NOT_FOUND` if the specified constraint was not found, + /// * `EMPTY_PROPERTIES` if the property set is empty, or + /// * `PROPERTIES_SIZE_LIMIT_EXCEEDED` if the property set exceeds the limit of maximum number of properties. + utils::BasicResult DropUniqueConstraint( + LabelId label, const std::set &properties, std::optional desired_commit_timestamp = {}); ConstraintsInfo ListAllConstraints() const; @@ -474,9 +507,11 @@ class Storage final { bool InitializeWalFile(); void FinalizeWalFile(); - void AppendToWal(const Transaction &transaction, uint64_t final_commit_timestamp); - void AppendToWal(durability::StorageGlobalOperation operation, LabelId label, const std::set &properties, - uint64_t final_commit_timestamp); + /// Return true in all cases excepted if any sync replicas have not sent confirmation. + [[nodiscard]] bool AppendToWalDataManipulation(const Transaction &transaction, uint64_t final_commit_timestamp); + /// Return true in all cases excepted if any sync replicas have not sent confirmation. + [[nodiscard]] bool AppendToWalDataDefinition(durability::StorageGlobalOperation operation, LabelId label, + const std::set &properties, uint64_t final_commit_timestamp); uint64_t CommitTimestamp(std::optional desired_commit_timestamp = {}); diff --git a/src/storage/v2/storage_error.hpp b/src/storage/v2/storage_error.hpp new file mode 100644 index 000000000..1e071f748 --- /dev/null +++ b/src/storage/v2/storage_error.hpp @@ -0,0 +1,38 @@ +// Copyright 2022 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#pragma once + +#include "storage/v2/constraints.hpp" + +#include + +namespace memgraph::storage { + +struct ReplicationError {}; + +using StorageDataManipulationError = std::variant; + +struct IndexDefinitionError {}; +using StorageIndexDefinitionError = std::variant; + +struct ConstraintDefinitionError {}; + +using StorageExistenceConstraintDefinitionError = + std::variant; + +using StorageExistenceConstraintDroppingError = std::variant; + +using StorageUniqueConstraintDefinitionError = std::variant; + +using StorageUniqueConstraintDroppingError = std::variant; + +} // namespace memgraph::storage diff --git a/tests/benchmark/expansion.cpp b/tests/benchmark/expansion.cpp index e6052efb5..5ff412eea 100644 --- a/tests/benchmark/expansion.cpp +++ b/tests/benchmark/expansion.cpp @@ -45,7 +45,7 @@ class ExpansionBenchFixture : public benchmark::Fixture { MG_ASSERT(!dba.Commit().HasError()); } - MG_ASSERT(db->CreateIndex(label)); + MG_ASSERT(!db->CreateIndex(label).HasError()); interpreter_context.emplace(&*db, memgraph::query::InterpreterConfig{}, data_directory); interpreter.emplace(&*interpreter_context); diff --git a/tests/benchmark/query/execution.cpp b/tests/benchmark/query/execution.cpp index 90d12e78f..69aedaff9 100644 --- a/tests/benchmark/query/execution.cpp +++ b/tests/benchmark/query/execution.cpp @@ -83,7 +83,7 @@ static void AddStarGraph(memgraph::storage::Storage *db, int spoke_count, int de } MG_ASSERT(!dba.Commit().HasError()); } - MG_ASSERT(db->CreateIndex(db->NameToLabel(kStartLabel))); + MG_ASSERT(!db->CreateIndex(db->NameToLabel(kStartLabel)).HasError()); } static void AddTree(memgraph::storage::Storage *db, int vertex_count) { @@ -105,7 +105,7 @@ static void AddTree(memgraph::storage::Storage *db, int vertex_count) { } MG_ASSERT(!dba.Commit().HasError()); } - MG_ASSERT(db->CreateIndex(db->NameToLabel(kStartLabel))); + MG_ASSERT(!db->CreateIndex(db->NameToLabel(kStartLabel)).HasError()); } static memgraph::query::CypherQuery *ParseCypherQuery(const std::string &query_string, diff --git a/tests/concurrent/storage_indices.cpp b/tests/concurrent/storage_indices.cpp index 017442fdc..8c50ef18b 100644 --- a/tests/concurrent/storage_indices.cpp +++ b/tests/concurrent/storage_indices.cpp @@ -16,6 +16,7 @@ #include #include "storage/v2/storage.hpp" +#include "storage/v2/storage_error.hpp" #include "utils/thread.hpp" const uint64_t kNumVerifiers = 5; @@ -29,7 +30,7 @@ TEST(Storage, LabelIndex) { auto store = memgraph::storage::Storage(); auto label = store.NameToLabel("label"); - ASSERT_TRUE(store.CreateIndex(label)); + ASSERT_FALSE(store.CreateIndex(label).HasError()); std::vector verifiers; verifiers.reserve(kNumVerifiers); @@ -111,7 +112,7 @@ TEST(Storage, LabelPropertyIndex) { auto label = store.NameToLabel("label"); auto prop = store.NameToProperty("prop"); - ASSERT_TRUE(store.CreateIndex(label, prop)); + ASSERT_FALSE(store.CreateIndex(label, prop).HasError()); std::vector verifiers; verifiers.reserve(kNumVerifiers); diff --git a/tests/e2e/configuration/default_config.py b/tests/e2e/configuration/default_config.py index 34e2f1763..2fdfca78a 100644 --- a/tests/e2e/configuration/default_config.py +++ b/tests/e2e/configuration/default_config.py @@ -121,7 +121,6 @@ startup_config_dict = { "false", "Controls whether the storage recovers persisted data on startup.", ), - "storage_restore_replicas_on_startup": ("true", "true", "Controls replicas should be restored automatically."), "storage_snapshot_interval_sec": ( "0", "300", diff --git a/tests/e2e/mg_utils.py b/tests/e2e/mg_utils.py index 9159d3e2f..9eec91da0 100644 --- a/tests/e2e/mg_utils.py +++ b/tests/e2e/mg_utils.py @@ -8,7 +8,9 @@ def mg_sleep_and_assert(expected_value, function_to_retrieve_data, max_duration= current_time = time.time() duration = current_time - start_time if duration > max_duration: - assert False, " mg_sleep_and_assert has tried for too long and did not get the expected result!" + assert ( + False + ), f" mg_sleep_and_assert has tried for too long and did not get the expected result! Last result was: {result}" time.sleep(time_between_attempt) result = function_to_retrieve_data() diff --git a/tests/e2e/replication/show_while_creating_invalid_state.py b/tests/e2e/replication/show_while_creating_invalid_state.py index f4b0af35f..d57f556f2 100644 --- a/tests/e2e/replication/show_while_creating_invalid_state.py +++ b/tests/e2e/replication/show_while_creating_invalid_state.py @@ -13,6 +13,7 @@ import sys import os import pytest +import random from common import execute_and_fetch_all from mg_utils import mg_sleep_and_assert @@ -137,8 +138,11 @@ def test_basic_recovery(connection): # 9/ We add some data to main. # 10/ We re-add the two replicas droped/killed and check the data. # 11/ We kill another replica. - # 12/ Add some more data to main. - # 13/ Check the states of replicas. + # 12/ Add some more data to main. It must still still occured but exception is expected since one replica is down. + # 13/ Restart the replica + # 14/ Check the states of replicas. + # 15/ Add some data again. + # 16/ Check the data is added to all replicas. # 0/ data_directory = tempfile.TemporaryDirectory() @@ -263,10 +267,7 @@ def test_basic_recovery(connection): ("replica_4", "127.0.0.1:10004", "async", 6, 0, "ready"), } - def retrieve_data2(): - return set(execute_and_fetch_all(cursor, "SHOW REPLICAS;")) - - actual_data = mg_sleep_and_assert(expected_data, retrieve_data2) + actual_data = mg_sleep_and_assert(expected_data, retrieve_data) assert actual_data == expected_data for index in (1, 2, 3, 4): @@ -281,29 +282,63 @@ def test_basic_recovery(connection): ("replica_4", "127.0.0.1:10004", "async", 6, 0, "ready"), } - def retrieve_data3(): - return set(execute_and_fetch_all(cursor, "SHOW REPLICAS;")) - - actual_data = mg_sleep_and_assert(expected_data, retrieve_data3) + actual_data = mg_sleep_and_assert(expected_data, retrieve_data) assert actual_data == expected_data # 12/ - execute_and_fetch_all(cursor, "CREATE (p1:Number {name:'Magic_again_again', value:44})") - res_from_main = execute_and_fetch_all(cursor, QUERY_TO_CHECK) - assert len(res_from_main) == 3 - for index in (2, 3, 4): - assert interactive_mg_runner.MEMGRAPH_INSTANCES[f"replica_{index}"].query(QUERY_TO_CHECK) == res_from_main - - # 13/ + with pytest.raises(mgclient.DatabaseError): + interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query( + "CREATE (p1:Number {name:'Magic_again_again', value:44})" + ) expected_data = { ("replica_1", "127.0.0.1:10001", "sync", 0, 0, "invalid"), ("replica_2", "127.0.0.1:10002", "sync", 9, 0, "ready"), ("replica_3", "127.0.0.1:10003", "async", 9, 0, "ready"), ("replica_4", "127.0.0.1:10004", "async", 9, 0, "ready"), } + + actual_data = mg_sleep_and_assert(expected_data, retrieve_data) + assert actual_data == expected_data + + # 13/ + interactive_mg_runner.start(CONFIGURATION, "replica_1") + + # 14/ + expected_data = { + ("replica_1", "127.0.0.1:10001", "sync", 9, 0, "ready"), + ("replica_2", "127.0.0.1:10002", "sync", 9, 0, "ready"), + ("replica_3", "127.0.0.1:10003", "async", 9, 0, "ready"), + ("replica_4", "127.0.0.1:10004", "async", 9, 0, "ready"), + } + actual_data = mg_sleep_and_assert(expected_data, retrieve_data) + print("actual=", actual_data) + assert actual_data == expected_data + + res_from_main = execute_and_fetch_all(cursor, QUERY_TO_CHECK) + assert len(res_from_main) == 3 + for index in (1, 2, 3, 4): + assert interactive_mg_runner.MEMGRAPH_INSTANCES[f"replica_{index}"].query(QUERY_TO_CHECK) == res_from_main + + # 15/ + interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query( + "CREATE (p1:Number {name:'Magic_again_again_again', value:45})" + ) + + # 16/ + expected_data = { + ("replica_1", "127.0.0.1:10001", "sync", 12, 0, "ready"), + ("replica_2", "127.0.0.1:10002", "sync", 12, 0, "ready"), + ("replica_3", "127.0.0.1:10003", "async", 12, 0, "ready"), + ("replica_4", "127.0.0.1:10004", "async", 12, 0, "ready"), + } actual_data = set(execute_and_fetch_all(cursor, "SHOW REPLICAS;")) assert actual_data == expected_data + res_from_main = execute_and_fetch_all(cursor, QUERY_TO_CHECK) + assert len(res_from_main) == 4 + for index in (1, 2, 3, 4): + assert interactive_mg_runner.MEMGRAPH_INSTANCES[f"replica_{index}"].query(QUERY_TO_CHECK) == res_from_main + def test_conflict_at_startup(connection): # Goal of this test is to check starting up several instance with different replicas' configuration directory works as expected. @@ -334,7 +369,7 @@ def test_conflict_at_startup(connection): assert execute_and_fetch_all(cursor_2, "SHOW REPLICATION ROLE;")[0][0] == "main" -def test_basic_recovery_when_replica_is_kill_when_main_is_down(connection): +def test_basic_recovery_when_replica_is_kill_when_main_is_down(): # Goal of this test is to check the recovery of main. # 0/ We start all replicas manually: we want to be able to kill them ourselves without relying on external tooling to kill processes. # 1/ We check that all replicas have the correct state: they should all be ready. @@ -403,5 +438,1012 @@ def test_basic_recovery_when_replica_is_kill_when_main_is_down(connection): assert actual_data == expected_data +def test_async_replication_when_main_is_killed(): + # Goal of the test is to check that when main is randomly killed: + # -the ASYNC replica always contains a valid subset of data of main. + # We run the test 20 times, it should never fail. + + # 0/ Start main and replicas. + # 1/ Register replicas. + # 2/ Insert data in main, and randomly kill it. + # 3/ Check that the ASYNC replica has a valid subset. + + for test_repetition in range(20): + # 0/ + data_directory_main = tempfile.TemporaryDirectory() + data_directory_replica = tempfile.TemporaryDirectory() + CONFIGURATION = { + "async_replica": { + "args": ["--bolt-port", "7688", "--log-level=TRACE"], + "log_file": "async_replica.log", + "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"], + "data_directory": f"{data_directory_replica.name}", + }, + "main": { + "args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"], + "log_file": "main.log", + "setup_queries": [], + "data_directory": f"{data_directory_main.name}", + }, + } + + interactive_mg_runner.start_all(CONFIGURATION) + + # 1/ + interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query( + "REGISTER REPLICA async_replica ASYNC TO '127.0.0.1:10001';" + ) + + # 2/ + for index in range(50): + interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(f"CREATE (p:Number {{name:{index}}})") + if random.randint(0, 100) > 95: + main_killed = f"Main was killed at index={index}" + interactive_mg_runner.kill(CONFIGURATION, "main") + break + + # 3/ + # short explaination: + # res_from_async_replica is an arithmetic sequence with: + # -first term 0 + # -common difference 1 + # So we check its properties. If properties are fullfilled, it means the ASYNC replicas received a correct subset of messages + # from main in the correct order. + # In other word: res_from_async_replica is as [0, 1, ..., n-1, n] where values are consecutive integers. $ + # It should have the two properties: + # -list is sorted + # -the sum of all elements is equal to nOfTerms * (firstTerm + lastTerm) / 2 + + QUERY_TO_CHECK = "MATCH (n) RETURN COLLECT(n.name);" + res_from_async_replica = interactive_mg_runner.MEMGRAPH_INSTANCES["async_replica"].query(QUERY_TO_CHECK)[0][0] + assert res_from_async_replica == sorted(res_from_async_replica), main_killed + total_sum = sum(res_from_async_replica) + expected_sum = len(res_from_async_replica) * (res_from_async_replica[0] + res_from_async_replica[-1]) / 2 + assert total_sum == expected_sum, main_killed + + data_directory_main.cleanup() + data_directory_replica.cleanup() + + +def test_sync_replication_when_main_is_killed(): + # Goal of the test is to check that when main is randomly killed: + # -the SYNC replica always contains the exact data that was in main. + # We run the test 20 times, it should never fail. + + # 0/ Start main and replica. + # 1/ Register replica. + # 2/ Insert data in main, and randomly kill it. + # 3/ Check that the SYNC replica has exactly the same data than main. + + for test_repetition in range(20): + # 0/ + data_directory_main = tempfile.TemporaryDirectory() + data_directory_replica = tempfile.TemporaryDirectory() + CONFIGURATION = { + "sync_replica": { + "args": ["--bolt-port", "7688", "--log-level=TRACE"], + "log_file": "sync_replica.log", + "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"], + "data_directory": f"{data_directory_replica.name}", + }, + "main": { + "args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"], + "log_file": "main.log", + "setup_queries": [], + "data_directory": f"{data_directory_main.name}", + }, + } + + interactive_mg_runner.start_all(CONFIGURATION) + + # 1/ + interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query( + "REGISTER REPLICA sync_replica SYNC TO '127.0.0.1:10001';" + ) + + # 2/ + QUERY_TO_CHECK = "MATCH (n) RETURN COLLECT(n.name);" + last_result_from_main = interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_TO_CHECK)[0][0] + for index in range(50): + interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(f"CREATE (p:Number {{name:{index}}})") + last_result_from_main = interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_TO_CHECK)[0][0] + if random.randint(0, 100) > 95: + main_killed = f"Main was killed at index={index}" + interactive_mg_runner.kill(CONFIGURATION, "main") + break + + # 3/ + # The SYNC replica should have exactly the same data than main. + res_from_sync_replica = interactive_mg_runner.MEMGRAPH_INSTANCES["sync_replica"].query(QUERY_TO_CHECK)[0][0] + assert last_result_from_main == res_from_sync_replica, main_killed + + data_directory_main.cleanup() + data_directory_replica.cleanup() + + +def test_attempt_to_write_data_on_main_when_async_replica_is_down(): + # Goal of this test is to check that main can write new data if an async replica is down. + # 0/ Start main and async replicas. + # 1/ Check status of replicas. + # 2/ Add some nodes to main and check it is propagated to the async_replicas. + # 3/ Kill an async replica. + # 4/ Try to add some data to main. + # 5/ Check the status of replicas. + # 6/ Check that the data was added to main and remaining replica. + + CONFIGURATION = { + "async_replica1": { + "args": ["--bolt-port", "7688", "--log-level=TRACE"], + "log_file": "async_replica1.log", + "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"], + }, + "async_replica2": { + "args": ["--bolt-port", "7689", "--log-level=TRACE"], + "log_file": "async_replica2.log", + "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"], + }, + "main": { + "args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"], + "log_file": "main.log", + "setup_queries": [ + "REGISTER REPLICA async_replica1 ASYNC TO '127.0.0.1:10001';", + "REGISTER REPLICA async_replica2 ASYNC TO '127.0.0.1:10002';", + ], + }, + } + + # 0/ + interactive_mg_runner.start_all(CONFIGURATION) + + # 1/ + expected_data = { + ("async_replica1", "127.0.0.1:10001", "async", 0, 0, "ready"), + ("async_replica2", "127.0.0.1:10002", "async", 0, 0, "ready"), + } + actual_data = set(interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("SHOW REPLICAS;")) + assert actual_data == expected_data + + # 2/ + interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("CREATE (p:Number {name:1});") + + QUERY_TO_CHECK = "MATCH (node) return node;" + res_from_main = interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_TO_CHECK) + assert len(res_from_main) == 1 + assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["async_replica1"].query(QUERY_TO_CHECK) + assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["async_replica2"].query(QUERY_TO_CHECK) + + # 3/ + interactive_mg_runner.kill(CONFIGURATION, "async_replica1") + + # 4/ + interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("CREATE (p:Number {name:2});") + + # 5/ + expected_data = [ + ("async_replica1", "async", 0, "invalid"), + ("async_replica2", "async", 0, "ready"), + ] + + def retrieve_data(): + replicas = interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("SHOW REPLICAS;") + return [ + (replica_name, mode, timestamp_behind_main, status) + for replica_name, ip, mode, timestamp, timestamp_behind_main, status in replicas + ] + + actual_data = mg_sleep_and_assert(expected_data, retrieve_data) + assert actual_data == expected_data + + # 6/ + res_from_main = interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_TO_CHECK) + assert len(res_from_main) == 2 + assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["async_replica2"].query(QUERY_TO_CHECK) + + +def test_attempt_to_write_data_on_main_when_sync_replica_is_down(): + # Goal of this test is to check that main cannot write new data if a sync replica is down. + # 0/ Start main and sync replicas. + # 1/ Check status of replicas. + # 2/ Add some nodes to main and check it is propagated to the sync_replicas. + # 3/ Kill a sync replica. + # 4/ Add some data to main. It should be added to main and replica2 + # 5/ Check the status of replicas. + # 6/ Restart the replica that was killed and check that it is up to date with main. + + CONFIGURATION = { + "sync_replica1": { + "args": ["--bolt-port", "7688", "--log-level=TRACE"], + "log_file": "sync_replica1.log", + "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"], + }, + "sync_replica2": { + "args": ["--bolt-port", "7689", "--log-level=TRACE"], + "log_file": "sync_replica2.log", + "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"], + }, + "main": { + "args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"], + "log_file": "main.log", + "setup_queries": [ + "REGISTER REPLICA sync_replica1 SYNC TO '127.0.0.1:10001';", + "REGISTER REPLICA sync_replica2 SYNC TO '127.0.0.1:10002';", + ], + }, + } + + # 0/ + interactive_mg_runner.start_all(CONFIGURATION) + + # 1/ + expected_data = { + ("sync_replica1", "127.0.0.1:10001", "sync", 0, 0, "ready"), + ("sync_replica2", "127.0.0.1:10002", "sync", 0, 0, "ready"), + } + actual_data = set(interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("SHOW REPLICAS;")) + assert actual_data == expected_data + + # 2/ + interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("CREATE (p:Number {name:1});") + + QUERY_TO_CHECK = "MATCH (node) return node;" + res_from_main = interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_TO_CHECK) + assert len(res_from_main) == 1 + assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["sync_replica1"].query(QUERY_TO_CHECK) + assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["sync_replica2"].query(QUERY_TO_CHECK) + + # 3/ + interactive_mg_runner.kill(CONFIGURATION, "sync_replica1") + expected_data = [ + ("sync_replica1", "sync", 0, "invalid"), + ("sync_replica2", "sync", 0, "ready"), + ] + + def retrieve_data(): + replicas = interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("SHOW REPLICAS;") + return [ + (replica_name, mode, timestamp_behind_main, status) + for replica_name, ip, mode, timestamp, timestamp_behind_main, status in replicas + ] + + actual_data = mg_sleep_and_assert(expected_data, retrieve_data) + assert actual_data == expected_data + + # 4/ + with pytest.raises(mgclient.DatabaseError): + interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("CREATE (p:Number {name:2});") + + res_from_main = interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_TO_CHECK) + assert len(res_from_main) == 2 + assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["sync_replica2"].query(QUERY_TO_CHECK) + + # 5/ + expected_data = { + ("sync_replica1", "127.0.0.1:10001", "sync", 0, 0, "invalid"), + ("sync_replica2", "127.0.0.1:10002", "sync", 5, 0, "ready"), + } + res_from_main = interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_TO_CHECK) + actual_data = set(interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("SHOW REPLICAS;")) + assert actual_data == expected_data + + # 6/ + interactive_mg_runner.start(CONFIGURATION, "sync_replica1") + expected_data = [ + ("sync_replica1", "sync", 0, "ready"), + ("sync_replica2", "sync", 0, "ready"), + ] + actual_data = mg_sleep_and_assert(expected_data, retrieve_data) + assert actual_data == expected_data + res_from_main = interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_TO_CHECK) + assert len(res_from_main) == 2 + assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["sync_replica1"].query(QUERY_TO_CHECK) + assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["sync_replica2"].query(QUERY_TO_CHECK) + + +def test_attempt_to_create_indexes_on_main_when_async_replica_is_down(): + # Goal of this test is to check that main can create new indexes/constraints if an async replica is down. + # 0/ Start main and async replicas. + # 1/ Check status of replicas. + # 2/ Add some indexes to main and check it is propagated to the async_replicas. + # 3/ Kill an async replica. + # 4/ Try to add some more indexes to main. + # 5/ Check the status of replicas. + # 6/ Check that the indexes were added to main and remaining replica. + + CONFIGURATION = { + "async_replica1": { + "args": ["--bolt-port", "7688", "--log-level=TRACE"], + "log_file": "async_replica1.log", + "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"], + }, + "async_replica2": { + "args": ["--bolt-port", "7689", "--log-level=TRACE"], + "log_file": "async_replica2.log", + "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"], + }, + "main": { + "args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"], + "log_file": "main.log", + "setup_queries": [ + "REGISTER REPLICA async_replica1 ASYNC TO '127.0.0.1:10001';", + "REGISTER REPLICA async_replica2 ASYNC TO '127.0.0.1:10002';", + ], + }, + } + + # 0/ + interactive_mg_runner.start_all(CONFIGURATION) + + # 1/ + expected_data = { + ("async_replica1", "127.0.0.1:10001", "async", 0, 0, "ready"), + ("async_replica2", "127.0.0.1:10002", "async", 0, 0, "ready"), + } + actual_data = set(interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("SHOW REPLICAS;")) + assert actual_data == expected_data + + # 2/ + interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("CREATE INDEX ON :Number(value);") + + QUERY_TO_CHECK = "SHOW INDEX INFO;" + res_from_main = interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_TO_CHECK) + assert len(res_from_main) == 1 + assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["async_replica1"].query(QUERY_TO_CHECK) + assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["async_replica2"].query(QUERY_TO_CHECK) + + # 3/ + interactive_mg_runner.kill(CONFIGURATION, "async_replica1") + + # 4/ + interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("CREATE INDEX ON :Number(value2);") + + # 5/ + expected_data = [ + ("async_replica1", "async", 0, "invalid"), + ("async_replica2", "async", 0, "ready"), + ] + + def retrieve_data(): + replicas = interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("SHOW REPLICAS;") + return [ + (replica_name, mode, timestamp_behind_main, status) + for replica_name, ip, mode, timestamp, timestamp_behind_main, status in replicas + ] + + actual_data = mg_sleep_and_assert(expected_data, retrieve_data) + assert actual_data == expected_data + + # 6/ + res_from_main = interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_TO_CHECK) + assert len(res_from_main) == 2 + assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["async_replica2"].query(QUERY_TO_CHECK) + + +def test_attempt_to_create_indexes_on_main_when_sync_replica_is_down(): + # Goal of this test is to check that main cannot create new indexes/constraints if a sync replica is down. + # 0/ Start main and sync replicas. + # 1/ Check status of replicas. + # 2/ Add some indexes to main and check it is propagated to the sync_replicas. + # 3/ Kill a sync replica. + # 4/ Add some more indexes to main. It should be added to main and replica2 + # 5/ Check the status of replicas. + # 6/ Restart the replica that was killed and check that it is up to date with main. + + CONFIGURATION = { + "sync_replica1": { + "args": ["--bolt-port", "7688", "--log-level=TRACE"], + "log_file": "sync_replica1.log", + "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"], + }, + "sync_replica2": { + "args": ["--bolt-port", "7689", "--log-level=TRACE"], + "log_file": "sync_replica2.log", + "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"], + }, + "main": { + "args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"], + "log_file": "main.log", + "setup_queries": [ + "REGISTER REPLICA sync_replica1 SYNC TO '127.0.0.1:10001';", + "REGISTER REPLICA sync_replica2 SYNC TO '127.0.0.1:10002';", + ], + }, + } + + # 0/ + interactive_mg_runner.start_all(CONFIGURATION) + + # 1/ + expected_data = { + ("sync_replica1", "127.0.0.1:10001", "sync", 0, 0, "ready"), + ("sync_replica2", "127.0.0.1:10002", "sync", 0, 0, "ready"), + } + actual_data = set(interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("SHOW REPLICAS;")) + assert actual_data == expected_data + + # 2/ + interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("CREATE INDEX ON :Number(value);") + + QUERY_TO_CHECK = "SHOW INDEX INFO;" + res_from_main = interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_TO_CHECK) + assert len(res_from_main) == 1 + assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["sync_replica1"].query(QUERY_TO_CHECK) + assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["sync_replica2"].query(QUERY_TO_CHECK) + + # 3/ + interactive_mg_runner.kill(CONFIGURATION, "sync_replica1") + expected_data = [ + ("sync_replica1", "sync", 0, "invalid"), + ("sync_replica2", "sync", 0, "ready"), + ] + + def retrieve_data(): + replicas = interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("SHOW REPLICAS;") + return [ + (replica_name, mode, timestamp_behind_main, status) + for replica_name, ip, mode, timestamp, timestamp_behind_main, status in replicas + ] + + actual_data = mg_sleep_and_assert(expected_data, retrieve_data) + assert actual_data == expected_data + + # 4/ + with pytest.raises(mgclient.DatabaseError): + interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("CREATE INDEX ON :Number(value2);") + + res_from_main = interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_TO_CHECK) + assert len(res_from_main) == 2 + assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["sync_replica2"].query(QUERY_TO_CHECK) + + # 5/ + expected_data = { + ("sync_replica1", "127.0.0.1:10001", "sync", 0, 0, "invalid"), + ("sync_replica2", "127.0.0.1:10002", "sync", 2, 0, "ready"), + } + res_from_main = interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_TO_CHECK) + actual_data = set(interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("SHOW REPLICAS;")) + assert actual_data == expected_data + + # 6/ + interactive_mg_runner.start(CONFIGURATION, "sync_replica1") + expected_data = [ + ("sync_replica1", "sync", 0, "ready"), + ("sync_replica2", "sync", 0, "ready"), + ] + actual_data = mg_sleep_and_assert(expected_data, retrieve_data) + assert actual_data == expected_data + res_from_main = interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_TO_CHECK) + assert len(res_from_main) == 2 + assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["sync_replica1"].query(QUERY_TO_CHECK) + assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["sync_replica2"].query(QUERY_TO_CHECK) + + +def test_trigger_on_create_before_commit_with_offline_sync_replica(): + # 0/ Start all. + # 1/ Create the trigger + # 2/ Create a node. We expect two nodes created (our Not_Magic and the Magic created by trigger). + # 3/ Check the nodes + # 4/ We remove all nodes. + # 5/ Kill a replica and check that it's offline. + # 6/ Create new node. + # 7/ Check that we have two nodes. + # 8/ Re-start the replica and check it's online and that it has two nodes. + + CONFIGURATION = { + "sync_replica1": { + "args": ["--bolt-port", "7688", "--log-level=TRACE"], + "log_file": "sync_replica1.log", + "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"], + }, + "sync_replica2": { + "args": ["--bolt-port", "7689", "--log-level=TRACE"], + "log_file": "sync_replica2.log", + "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"], + }, + "main": { + "args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"], + "log_file": "main.log", + "setup_queries": [ + "REGISTER REPLICA sync_replica1 SYNC TO '127.0.0.1:10001';", + "REGISTER REPLICA sync_replica2 SYNC TO '127.0.0.1:10002';", + ], + }, + } + + # 0/ + interactive_mg_runner.start_all(CONFIGURATION) + + # 1/ + QUERY_CREATE_TRIGGER = """ + CREATE TRIGGER exampleTrigger + ON CREATE BEFORE COMMIT EXECUTE + CREATE (p:Number {name:'Node_created_by_trigger'}); + """ + interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_CREATE_TRIGGER) + res_from_main = interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("SHOW TRIGGERS;") + assert len(res_from_main) == 1, f"Incorect result: {res_from_main}" + + # 2/ + QUERY_CREATE_NODE = "CREATE (p:Number {name:'Not_Magic'})" + interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_CREATE_NODE) + + # 3/ + QUERY_TO_CHECK = "MATCH (node) return node;" + res_from_main = interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_TO_CHECK) + assert len(res_from_main) == 2, f"Incorect result: {res_from_main}" + assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["sync_replica1"].query(QUERY_TO_CHECK) + assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["sync_replica2"].query(QUERY_TO_CHECK) + + # 4/ + interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("MATCH (n) DETACH DELETE n;") + + # 5/ + interactive_mg_runner.kill(CONFIGURATION, "sync_replica1") + expected_data = [ + ("sync_replica1", "sync", 0, "invalid"), + ("sync_replica2", "sync", 0, "ready"), + ] + + def retrieve_data(): + replicas = interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("SHOW REPLICAS;") + return [ + (replica_name, mode, timestamp_behind_main, status) + for replica_name, ip, mode, timestamp, timestamp_behind_main, status in replicas + ] + + actual_data = mg_sleep_and_assert(expected_data, retrieve_data) + assert actual_data == expected_data + + # 6/ + with pytest.raises(mgclient.DatabaseError): + interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_CREATE_NODE) + + # 7/ + res_from_main = interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_TO_CHECK) + assert len(res_from_main) == 2 + assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["sync_replica2"].query(QUERY_TO_CHECK) + + # 8/ + interactive_mg_runner.start(CONFIGURATION, "sync_replica1") + expected_data = [ + ("sync_replica1", "sync", 0, "ready"), + ("sync_replica2", "sync", 0, "ready"), + ] + actual_data = mg_sleep_and_assert(expected_data, retrieve_data) + assert actual_data == expected_data + res_from_main = interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_TO_CHECK) + assert len(res_from_main) == 2 + assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["sync_replica1"].query(QUERY_TO_CHECK) + assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["sync_replica2"].query(QUERY_TO_CHECK) + + +def test_trigger_on_update_before_commit_with_offline_sync_replica(): + # 0/ Start all. + # 1/ Create the trigger + # 2/ Create a node. + # 3/ Update the node: we expect another node to be created + # 4/ Check the nodes + # 5/ We remove all nodes and create new node again. + # 6/ Kill a replica and check that it's offline. + # 7/ Update the node. + # 8/ Check that we have two nodes. + # 9/ Re-start the replica and check it's online and that it has two nodes. + + CONFIGURATION = { + "sync_replica1": { + "args": ["--bolt-port", "7688", "--log-level=TRACE"], + "log_file": "sync_replica1.log", + "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"], + }, + "sync_replica2": { + "args": ["--bolt-port", "7689", "--log-level=TRACE"], + "log_file": "sync_replica2.log", + "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"], + }, + "main": { + "args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"], + "log_file": "main.log", + "setup_queries": [ + "REGISTER REPLICA sync_replica1 SYNC TO '127.0.0.1:10001';", + "REGISTER REPLICA sync_replica2 SYNC TO '127.0.0.1:10002';", + ], + }, + } + + # 0/ + interactive_mg_runner.start_all(CONFIGURATION) + # 1/ + QUERY_CREATE_TRIGGER = """ + CREATE TRIGGER exampleTrigger + ON UPDATE BEFORE COMMIT EXECUTE + CREATE (p:Number {name:'Node_created_by_trigger'}); + """ + interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_CREATE_TRIGGER) + res_from_main = interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("SHOW TRIGGERS;") + assert len(res_from_main) == 1, f"Incorect result: {res_from_main}" + + # 2/ + QUERY_CREATE_NODE = "CREATE (p:Number {name:'Not_Magic', value:0})" + interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_CREATE_NODE) + + # 3/ + QUERY_TO_UPDATE = "MATCH (node:Number {name:'Not_Magic'}) SET node.value=1 return node;" + interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_TO_UPDATE) + + # 4/ + QUERY_TO_CHECK = "MATCH (node) return node;" + res_from_main = interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_TO_CHECK) + assert len(res_from_main) == 2, f"Incorect result: {res_from_main}" + assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["sync_replica1"].query(QUERY_TO_CHECK) + assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["sync_replica2"].query(QUERY_TO_CHECK) + + # 5/ + interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("MATCH (n) DETACH DELETE n;") + interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_CREATE_NODE) + + # 6/ + interactive_mg_runner.kill(CONFIGURATION, "sync_replica1") + expected_data = [ + ("sync_replica1", "sync", 0, "invalid"), + ("sync_replica2", "sync", 0, "ready"), + ] + + def retrieve_data(): + replicas = interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("SHOW REPLICAS;") + return [ + (replica_name, mode, timestamp_behind_main, status) + for replica_name, ip, mode, timestamp, timestamp_behind_main, status in replicas + ] + + actual_data = mg_sleep_and_assert(expected_data, retrieve_data) + assert actual_data == expected_data + + # 7/ + with pytest.raises(mgclient.DatabaseError): + interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_TO_UPDATE) + + # 8/ + res_from_main = interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_TO_CHECK) + assert len(res_from_main) == 2 + assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["sync_replica2"].query(QUERY_TO_CHECK) + + # 9/ + interactive_mg_runner.start(CONFIGURATION, "sync_replica1") + expected_data = [ + ("sync_replica1", "sync", 0, "ready"), + ("sync_replica2", "sync", 0, "ready"), + ] + actual_data = mg_sleep_and_assert(expected_data, retrieve_data) + assert actual_data == expected_data + res_from_main = interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_TO_CHECK) + assert len(res_from_main) == 2 + assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["sync_replica1"].query(QUERY_TO_CHECK) + assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["sync_replica2"].query(QUERY_TO_CHECK) + + +def test_trigger_on_delete_before_commit_with_offline_sync_replica(): + # 0/ Start all. + # 1/ Create the trigger + # 2/ Create a node. + # 3/ Delete the node: we expect another node to be created + # 4/ Check that we have one node. + # 5/ We remove all triggers and all nodes and create new trigger and node again. + # 6/ Kill a replica and check that it's offline. + # 7/ Delete the node. + # 8/ Check that we have one node. + # 9/ Re-start the replica and check it's online and that it has one node, and the correct one. + + CONFIGURATION = { + "sync_replica1": { + "args": ["--bolt-port", "7688", "--log-level=TRACE"], + "log_file": "sync_replica1.log", + "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"], + }, + "sync_replica2": { + "args": ["--bolt-port", "7689", "--log-level=TRACE"], + "log_file": "sync_replica2.log", + "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"], + }, + "main": { + "args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"], + "log_file": "main.log", + "setup_queries": [ + "REGISTER REPLICA sync_replica1 SYNC TO '127.0.0.1:10001';", + "REGISTER REPLICA sync_replica2 SYNC TO '127.0.0.1:10002';", + ], + }, + } + + # 0/ + interactive_mg_runner.start_all(CONFIGURATION) + + # 1/ + QUERY_CREATE_TRIGGER = """ + CREATE TRIGGER exampleTrigger + ON DELETE BEFORE COMMIT EXECUTE + CREATE (p:Number {name:'Node_created_by_trigger'}); + """ + interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_CREATE_TRIGGER) + res_from_main = interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("SHOW TRIGGERS;") + assert len(res_from_main) == 1, f"Incorect result: {res_from_main}" + + # 2/ + QUERY_CREATE_NODE = "CREATE (p:Number {name:'Not_Magic', value:0})" + interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_CREATE_NODE) + + # 3/ + QUERY_TO_DELETE = "MATCH (n) DETACH DELETE n;" + interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_TO_DELETE) + + # 4/ + QUERY_TO_CHECK = "MATCH (node) return node;" + res_from_main = interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_TO_CHECK) + assert len(res_from_main) == 1, f"Incorect result: {res_from_main}" + assert res_from_main[0][0].properties["name"] == "Node_created_by_trigger" + assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["sync_replica1"].query(QUERY_TO_CHECK) + assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["sync_replica2"].query(QUERY_TO_CHECK) + + # 5/ + interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("DROP TRIGGER exampleTrigger;") + interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("MATCH (n) DETACH DELETE n;") + interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_CREATE_TRIGGER) + interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_CREATE_NODE) + + # 6/ + interactive_mg_runner.kill(CONFIGURATION, "sync_replica1") + expected_data = [ + ("sync_replica1", "sync", 0, "invalid"), + ("sync_replica2", "sync", 0, "ready"), + ] + + def retrieve_data(): + replicas = interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("SHOW REPLICAS;") + return [ + (replica_name, mode, timestamp_behind_main, status) + for replica_name, ip, mode, timestamp, timestamp_behind_main, status in replicas + ] + + actual_data = mg_sleep_and_assert(expected_data, retrieve_data) + assert actual_data == expected_data + + # 7/ + with pytest.raises(mgclient.DatabaseError): + interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_TO_DELETE) + + # 8/ + res_from_main = interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_TO_CHECK) + assert len(res_from_main) == 1, f"Incorect result: {res_from_main}" + assert res_from_main[0][0].properties["name"] == "Node_created_by_trigger" + assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["sync_replica2"].query(QUERY_TO_CHECK) + + # 9/ + interactive_mg_runner.start(CONFIGURATION, "sync_replica1") + expected_data = [ + ("sync_replica1", "sync", 0, "ready"), + ("sync_replica2", "sync", 0, "ready"), + ] + actual_data = mg_sleep_and_assert(expected_data, retrieve_data) + assert actual_data == expected_data + res_from_main = interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_TO_CHECK) + assert len(res_from_main) == 1 + assert res_from_main[0][0].properties["name"] == "Node_created_by_trigger" + assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["sync_replica1"].query(QUERY_TO_CHECK) + assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["sync_replica2"].query(QUERY_TO_CHECK) + + +def test_trigger_on_create_before_and_after_commit_with_offline_sync_replica(): + # 0/ Start all. + # 1/ Create the triggers + # 2/ Create a node. We expect three nodes created (1 node created + the two created by triggers). + # 3/ Check the nodes + # 4/ We remove all nodes. + # 5/ Kill a replica and check that it's offline. + # 6/ Create new node. + # 7/ Check that we have three nodes. + # 8/ Re-start the replica and check it's online and that it has three nodes. + + CONFIGURATION = { + "sync_replica1": { + "args": ["--bolt-port", "7688", "--log-level=TRACE"], + "log_file": "sync_replica1.log", + "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"], + }, + "sync_replica2": { + "args": ["--bolt-port", "7689", "--log-level=TRACE"], + "log_file": "sync_replica2.log", + "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"], + }, + "main": { + "args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"], + "log_file": "main.log", + "setup_queries": [ + "REGISTER REPLICA sync_replica1 SYNC TO '127.0.0.1:10001';", + "REGISTER REPLICA sync_replica2 SYNC TO '127.0.0.1:10002';", + ], + }, + } + + # 0/ + interactive_mg_runner.start_all(CONFIGURATION) + + # 1/ + QUERY_CREATE_TRIGGER_BEFORE = """ + CREATE TRIGGER exampleTriggerBefore + ON CREATE BEFORE COMMIT EXECUTE + CREATE (p:Number {name:'Node_created_by_trigger_before'}); + """ + QUERY_CREATE_TRIGGER_AFTER = """ + CREATE TRIGGER exampleTriggerAfter + ON CREATE AFTER COMMIT EXECUTE + CREATE (p:Number {name:'Node_created_by_trigger_after'}); + """ + interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_CREATE_TRIGGER_BEFORE) + interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_CREATE_TRIGGER_AFTER) + res_from_main = interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("SHOW TRIGGERS;") + assert len(res_from_main) == 2, f"Incorect result: {res_from_main}" + + # 2/ + QUERY_CREATE_NODE = "CREATE (p:Number {name:'Not_Magic'})" + interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_CREATE_NODE) + + # 3/ + QUERY_TO_CHECK = "MATCH (node) return node;" + res_from_main = interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_TO_CHECK) + assert len(res_from_main) == 3, f"Incorect result: {res_from_main}" + assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["sync_replica1"].query(QUERY_TO_CHECK) + assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["sync_replica2"].query(QUERY_TO_CHECK) + + # 4/ + interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("MATCH (n) DETACH DELETE n;") + + # 5/ + interactive_mg_runner.kill(CONFIGURATION, "sync_replica1") + expected_data = [ + ("sync_replica1", "sync", 0, "invalid"), + ("sync_replica2", "sync", 0, "ready"), + ] + + def retrieve_data(): + replicas = interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("SHOW REPLICAS;") + return [ + (replica_name, mode, timestamp_behind_main, status) + for replica_name, ip, mode, timestamp, timestamp_behind_main, status in replicas + ] + + actual_data = mg_sleep_and_assert(expected_data, retrieve_data) + assert actual_data == expected_data + + # 6/ + with pytest.raises(mgclient.DatabaseError): + interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_CREATE_NODE) + + # 7/ + res_from_main = interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_TO_CHECK) + assert len(res_from_main) == 3 + assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["sync_replica2"].query(QUERY_TO_CHECK) + + # 8/ + interactive_mg_runner.start(CONFIGURATION, "sync_replica1") + expected_data = [ + ("sync_replica1", "sync", 0, "ready"), + ("sync_replica2", "sync", 0, "ready"), + ] + actual_data = mg_sleep_and_assert(expected_data, retrieve_data) + assert actual_data == expected_data + res_from_main = interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_TO_CHECK) + assert len(res_from_main) == 3 + assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["sync_replica1"].query(QUERY_TO_CHECK) + assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["sync_replica2"].query(QUERY_TO_CHECK) + + +def test_triggers_on_create_before_commit_with_offline_sync_replica(): + # 0/ Start all. + # 1/ Create the two triggers + # 2/ Create a node. We expect three nodes. + # 3/ Check the nodes + # 4/ We remove all nodes. + # 5/ Kill a replica and check that it's offline. + # 6/ Create new node. + # 7/ Check that we have three nodes. + # 8/ Re-start the replica and check it's online and that it has two nodes. + + CONFIGURATION = { + "sync_replica1": { + "args": ["--bolt-port", "7688", "--log-level=TRACE"], + "log_file": "sync_replica1.log", + "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"], + }, + "sync_replica2": { + "args": ["--bolt-port", "7689", "--log-level=TRACE"], + "log_file": "sync_replica2.log", + "setup_queries": ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"], + }, + "main": { + "args": ["--bolt-port", "7687", "--log-level=TRACE", "--storage-recover-on-startup=true"], + "log_file": "main.log", + "setup_queries": [ + "REGISTER REPLICA sync_replica1 SYNC TO '127.0.0.1:10001';", + "REGISTER REPLICA sync_replica2 SYNC TO '127.0.0.1:10002';", + ], + }, + } + + # 0/ + interactive_mg_runner.start_all(CONFIGURATION) + + # 1/ + QUERY_CREATE_TRIGGER_FIRST = """ + CREATE TRIGGER exampleTriggerFirst + ON CREATE BEFORE COMMIT EXECUTE + CREATE (p:Number {name:'Node_created_by_trigger_first'}); + """ + QUERY_CREATE_TRIGGER_SECOND = """ + CREATE TRIGGER exampleTriggerSecond + ON CREATE BEFORE COMMIT EXECUTE + CREATE (p:Number {name:'Node_created_by_trigger_second'}); + """ + interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_CREATE_TRIGGER_FIRST) + interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_CREATE_TRIGGER_SECOND) + res_from_main = interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("SHOW TRIGGERS;") + assert len(res_from_main) == 2, f"Incorect result: {res_from_main}" + + # 2/ + QUERY_CREATE_NODE = "CREATE (p:Number {name:'Not_Magic'})" + interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_CREATE_NODE) + + # 3/ + QUERY_TO_CHECK = "MATCH (node) return node;" + res_from_main = interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_TO_CHECK) + assert len(res_from_main) == 3, f"Incorect result: {res_from_main}" + assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["sync_replica1"].query(QUERY_TO_CHECK) + assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["sync_replica2"].query(QUERY_TO_CHECK) + + # 4/ + interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("MATCH (n) DETACH DELETE n;") + + # 5/ + interactive_mg_runner.kill(CONFIGURATION, "sync_replica1") + expected_data = [ + ("sync_replica1", "sync", 0, "invalid"), + ("sync_replica2", "sync", 0, "ready"), + ] + + def retrieve_data(): + replicas = interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query("SHOW REPLICAS;") + return [ + (replica_name, mode, timestamp_behind_main, status) + for replica_name, ip, mode, timestamp, timestamp_behind_main, status in replicas + ] + + actual_data = mg_sleep_and_assert(expected_data, retrieve_data) + assert actual_data == expected_data + + # 6/ + with pytest.raises(mgclient.DatabaseError): + interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_CREATE_NODE) + + # 7/ + def get_number_of_nodes(): + return len(interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_TO_CHECK)) + + mg_sleep_and_assert(3, get_number_of_nodes) + + res_from_main = interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_TO_CHECK) + assert len(res_from_main) == 3 + assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["sync_replica2"].query(QUERY_TO_CHECK) + + # 8/ + interactive_mg_runner.start(CONFIGURATION, "sync_replica1") + expected_data = [ + ("sync_replica1", "sync", 0, "ready"), + ("sync_replica2", "sync", 0, "ready"), + ] + actual_data = mg_sleep_and_assert(expected_data, retrieve_data) + assert actual_data == expected_data + res_from_main = interactive_mg_runner.MEMGRAPH_INSTANCES["main"].query(QUERY_TO_CHECK) + assert len(res_from_main) == 3 + assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["sync_replica1"].query(QUERY_TO_CHECK) + assert res_from_main == interactive_mg_runner.MEMGRAPH_INSTANCES["sync_replica2"].query(QUERY_TO_CHECK) + + if __name__ == "__main__": sys.exit(pytest.main([__file__, "-rA"])) diff --git a/tests/jepsen/src/jepsen/memgraph/bank.clj b/tests/jepsen/src/jepsen/memgraph/bank.clj index 4b5955903..e2e830bf8 100644 --- a/tests/jepsen/src/jepsen/memgraph/bank.clj +++ b/tests/jepsen/src/jepsen/memgraph/bank.clj @@ -6,6 +6,7 @@ should be consistent." (:require [neo4j-clj.core :as dbclient] [clojure.tools.logging :refer [info]] + [clojure.string :as string] [jepsen [client :as client] [checker :as checker] [generator :as gen]] @@ -80,13 +81,21 @@ :ok :fail))) (catch Exception e - ; Transaction can fail on serialization errors - (assoc op :type :fail :info (str e)))) + (if (string/includes? (str e) "At least one SYNC replica has not confirmed committing last transaction.") + (assoc op :type :ok :info (str e)); Exception due to down sync replica is accepted/expected + (assoc op :type :fail :info (str e))) + )) (assoc op :type :fail)))) (teardown! [this test] (when (= replication-role :main) (c/with-session conn session - (c/detach-delete-all session)))) + (try + (c/detach-delete-all session) + (catch Exception e + (if-not (string/includes? (str e) "At least one SYNC replica has not confirmed committing last transaction.") + (throw (Exception. (str "Invalid exception when deleting all nodes: " e)))); Exception due to down sync replica is accepted/expected + ) + )))) (close! [_ est] (dbclient/disconnect conn))) diff --git a/tests/jepsen/src/jepsen/memgraph/basic.clj b/tests/jepsen/src/jepsen/memgraph/basic.clj index dea828a64..3f4b1153b 100644 --- a/tests/jepsen/src/jepsen/memgraph/basic.clj +++ b/tests/jepsen/src/jepsen/memgraph/basic.clj @@ -1,6 +1,7 @@ (ns jepsen.memgraph.basic "Basic Memgraph test" - (:require [neo4j-clj.core :as dbclient] + (:require [neo4j-clj.core :as dbclient] + [clojure.string :as string] [jepsen [client :as client] [checker :as checker] [generator :as gen]] @@ -53,7 +54,13 @@ (assoc op :type :fail, :error :not-found))))) (teardown! [this test] (c/with-session conn session - (detach-delete-all session))) + (try + (c/detach-delete-all session) + (catch Exception e + (if-not (string/includes? (str e) "At least one SYNC replica has not confirmed committing last transaction.") + (throw (Exception. (str "Invalid exception when deleting all nodes: " e)))); Exception due to down sync replica is accepted/expected + ) + ))) (close! [_ est] (dbclient/disconnect conn))) @@ -73,4 +80,3 @@ :timeline (timeline/html)}) :generator (gen/mix [r w cas]) :final-generator (gen/once r)}) - diff --git a/tests/jepsen/src/jepsen/memgraph/client.clj b/tests/jepsen/src/jepsen/memgraph/client.clj index 176f2433f..580988abd 100644 --- a/tests/jepsen/src/jepsen/memgraph/client.clj +++ b/tests/jepsen/src/jepsen/memgraph/client.clj @@ -40,11 +40,11 @@ name " " (replication-mode-str node-config) - " TO \"" + " TO '" (:ip node-config) ":" (:port node-config) - "\""))) + "'"))) (defn create-set-replica-role-query [port] @@ -103,12 +103,12 @@ (doseq [n (filter #(= (:replication-role (val %)) :replica) node-config)] - (try + (try (c/with-session conn session ((c/create-register-replica-query (first n) (second n)) session)) - (catch Exception e))) + (catch Exception e))) (assoc op :type :ok)) (assoc op :type :fail))) cases)) diff --git a/tests/jepsen/src/jepsen/memgraph/core.clj b/tests/jepsen/src/jepsen/memgraph/core.clj index 545367bcc..816cb19d6 100644 --- a/tests/jepsen/src/jepsen/memgraph/core.clj +++ b/tests/jepsen/src/jepsen/memgraph/core.clj @@ -13,7 +13,6 @@ [jepsen.memgraph [basic :as basic] [bank :as bank] [large :as large] - [sequential :as sequential] [support :as s] [nemesis :as nemesis] [edn :as e]])) @@ -22,7 +21,6 @@ "A map of workload names to functions that can take opts and construct workloads." {:bank bank/workload - ;; :sequential sequential/workload (T0532-MG) :large large/workload}) (def nemesis-configuration diff --git a/tests/jepsen/src/jepsen/memgraph/large.clj b/tests/jepsen/src/jepsen/memgraph/large.clj index cc710252a..97aaa407d 100644 --- a/tests/jepsen/src/jepsen/memgraph/large.clj +++ b/tests/jepsen/src/jepsen/memgraph/large.clj @@ -2,6 +2,7 @@ "Large write test" (:require [neo4j-clj.core :as dbclient] [clojure.tools.logging :refer [info]] + [clojure.string :as string] [jepsen [client :as client] [checker :as checker] [generator :as gen]] @@ -40,13 +41,27 @@ :node node})) :add (if (= replication-role :main) (c/with-session conn session - (create-nodes session) - (assoc op :type :ok)) + (try + ((create-nodes session) + (assoc op :type :ok)) + (catch Exception e + (if (string/includes? (str e) "At least one SYNC replica has not confirmed committing last transaction.") + (assoc op :type :ok :info (str e)); Exception due to down sync replica is accepted/expected + (assoc op :type :fail :info (str e))) + ) + ) + ) (assoc op :type :fail)))) (teardown! [this test] (when (= replication-role :main) (c/with-session conn session - (c/detach-delete-all session)))) + (try + (c/detach-delete-all session) + (catch Exception e + (if-not (string/includes? (str e) "At least one SYNC replica has not confirmed committing last transaction.") + (throw (Exception. (str "Invalid exception when deleting all nodes: " e)))); Exception due to down sync replica is accepted/expected + ) + )))) (close! [_ est] (dbclient/disconnect conn))) diff --git a/tests/jepsen/src/jepsen/memgraph/sequential.clj b/tests/jepsen/src/jepsen/memgraph/sequential.clj deleted file mode 100644 index d50a241cb..000000000 --- a/tests/jepsen/src/jepsen/memgraph/sequential.clj +++ /dev/null @@ -1,154 +0,0 @@ -(ns jepsen.memgraph.sequential - "Sequential test" - (:require [neo4j-clj.core :as dbclient] - [clojure.tools.logging :refer [info]] - [jepsen [client :as client] - [checker :as checker] - [generator :as gen]] - [jepsen.checker.timeline :as timeline] - [jepsen.memgraph.client :as c])) - -(dbclient/defquery get-all-nodes - "MATCH (n:Node) RETURN n ORDER BY n.id;") - -(dbclient/defquery create-node - "CREATE (n:Node {id: $id});") - -(dbclient/defquery delete-node-with-id - "MATCH (n:Node {id: $id}) DELETE n;") - -(def next-node-for-add (atom 0)) - -(defn add-next-node - "Add a new node with its id set to the next highest" - [conn] - (when (dbclient/with-transaction conn tx - (create-node tx {:id (swap! next-node-for-add identity)})) - (swap! next-node-for-add inc))) - -(def next-node-for-delete (atom 0)) - -(defn delete-oldest-node - "Delete a node with the lowest id" - [conn] - (when (dbclient/with-transaction conn tx - (delete-node-with-id tx {:id (swap! next-node-for-delete identity)})) - (swap! next-node-for-delete inc))) - -(c/replication-client Client [] - (open! [this test node] - (c/replication-open-connection this node node-config)) - (setup! [this test] - (when (= replication-role :main) - (c/with-session conn session - (c/detach-delete-all session) - (create-node session {:id 0})))) - (invoke! [this test op] - (c/replication-invoke-case (:f op) - :read (c/with-session conn session - (assoc op - :type :ok - :value {:ids (->> (get-all-nodes session) - (map #(-> % :n :id)) - (reduce conj [])) - :node node})) - :add (if (= replication-role :main) - (try - (assoc op :type (if (add-next-node conn) :ok :fail)) - (catch Exception e - ; Transaction can fail on serialization errors - (assoc op :type :fail :info (str e)))) - (assoc op :type :fail)) - :delete (if (= replication-role :main) - (try - (assoc op :type (if (delete-oldest-node conn) :ok :fail)) - (catch Exception e - ; Transaction can fail on serialization errors - (assoc op :type :fail :info (str e)))) - (assoc op :type :fail)))) - (teardown! [this test] - (when (= replication-role :main) - (c/with-session conn session - (c/detach-delete-all session)))) - (close! [_ est] - (dbclient/disconnect conn))) - -(defn add-node - "Add node with id set to current_max_id + 1" - [test process] - {:type :invoke :f :add :value nil}) - -(defn read-ids - "Read all current ids of nodes" - [test process] - {:type :invoke :f :read :value nil}) - -(defn delete-node - "Delete node with the lowest id" - [test process] - {:type :invoke :f :delete :value nil}) - -(defn strictly-increasing - [coll] - (every? - #(< (first %) (second %)) - (partition 2 1 coll))) - -(defn increased-by-1 - [coll] - (every? - #(= (inc (first %)) (second %)) - (partition 2 1 coll))) - -(defn sequential-checker - "Check if all nodes have nodes with ids that are strictly increasing by 1. - All nodes need to have at leas 1 non-empty read." - [] - (reify checker/Checker - (check [this test history opts] - (let [ok-reads (->> history - (filter #(= :ok (:type %))) - (filter #(= :read (:f %)))) - bad-reads (->> ok-reads - (map (fn [op] - (let [ids (-> op :value :ids)] - (when (not-empty ids) - (cond ((complement strictly-increasing) ids) - {:type :not-increasing-ids - :op op}))))) - - ;; if there are multiple threads not sure how to guarante that the ids are created in order - ;;((complement increased-by-1) ids) - ;;{:type :ids-missing - ;; :op op}))))) - (filter identity) - (into [])) - empty-nodes (let [all-nodes (->> ok-reads - (map #(-> % :value :node)) - (reduce conj #{}))] - (->> all-nodes - (filter (fn [node] - (every? - empty? - (->> ok-reads - (map :value) - (filter #(= node (:node %))) - (map :ids))))) - (filter identity) - (into [])))] - {:valid? (and - (empty? bad-reads) - (empty? empty-nodes)) - :empty-nodes empty-nodes - :bad-reads bad-reads})))) - -(defn workload - [opts] - {:client (Client. nil nil nil (:node-config opts)) - :checker (checker/compose - {:sequential (sequential-checker) - :timeline (timeline/html)}) - :generator (c/replication-gen - (gen/phases (cycle [(gen/time-limit 1 (gen/mix [read-ids add-node])) - (gen/once delete-node)]))) - :final-generator (gen/once read-ids)}) diff --git a/tests/jepsen/src/jepsen/memgraph/support.clj b/tests/jepsen/src/jepsen/memgraph/support.clj index 0689f858e..42e5f4ab8 100644 --- a/tests/jepsen/src/jepsen/memgraph/support.clj +++ b/tests/jepsen/src/jepsen/memgraph/support.clj @@ -25,8 +25,7 @@ :--storage-recover-on-startup :--storage-wal-enabled :--storage-snapshot-interval-sec 300 - :--storage-properties-on-edges - :--storage-restore-replicas-on-startup false)) + :--storage-properties-on-edges)) (defn stop-node! [test node] diff --git a/tests/unit/query_cost_estimator.cpp b/tests/unit/query_cost_estimator.cpp index 5adc6f88d..86ba6cd1d 100644 --- a/tests/unit/query_cost_estimator.cpp +++ b/tests/unit/query_cost_estimator.cpp @@ -49,8 +49,8 @@ class QueryCostEstimator : public ::testing::Test { int symbol_count = 0; void SetUp() { - ASSERT_TRUE(db.CreateIndex(label)); - ASSERT_TRUE(db.CreateIndex(label, property)); + ASSERT_FALSE(db.CreateIndex(label).HasError()); + ASSERT_FALSE(db.CreateIndex(label, property).HasError()); storage_dba.emplace(db.Access()); dba.emplace(&*storage_dba); } diff --git a/tests/unit/query_dump.cpp b/tests/unit/query_dump.cpp index d0397e612..c3781834c 100644 --- a/tests/unit/query_dump.cpp +++ b/tests/unit/query_dump.cpp @@ -531,8 +531,8 @@ TEST(DumpTest, IndicesKeys) { CreateVertex(&dba, {"Label1", "Label 2"}, {{"p", memgraph::storage::PropertyValue(1)}}, false); ASSERT_FALSE(dba.Commit().HasError()); } - ASSERT_TRUE(db.CreateIndex(db.NameToLabel("Label1"), db.NameToProperty("prop"))); - ASSERT_TRUE(db.CreateIndex(db.NameToLabel("Label 2"), db.NameToProperty("prop `"))); + ASSERT_FALSE(db.CreateIndex(db.NameToLabel("Label1"), db.NameToProperty("prop")).HasError()); + ASSERT_FALSE(db.CreateIndex(db.NameToLabel("Label 2"), db.NameToProperty("prop `")).HasError()); { ResultStreamFaker stream(&db); @@ -558,8 +558,7 @@ TEST(DumpTest, ExistenceConstraints) { } { auto res = db.CreateExistenceConstraint(db.NameToLabel("L`abel 1"), db.NameToProperty("prop")); - ASSERT_TRUE(res.HasValue()); - ASSERT_TRUE(res.GetValue()); + ASSERT_FALSE(res.HasError()); } { @@ -694,16 +693,15 @@ TEST(DumpTest, CheckStateSimpleGraph) { } { auto ret = db.CreateExistenceConstraint(db.NameToLabel("Person"), db.NameToProperty("name")); - ASSERT_TRUE(ret.HasValue()); - ASSERT_TRUE(ret.GetValue()); + ASSERT_FALSE(ret.HasError()); } { auto ret = db.CreateUniqueConstraint(db.NameToLabel("Person"), {db.NameToProperty("name")}); ASSERT_TRUE(ret.HasValue()); ASSERT_EQ(ret.GetValue(), memgraph::storage::UniqueConstraints::CreationStatus::SUCCESS); } - ASSERT_TRUE(db.CreateIndex(db.NameToLabel("Person"), db.NameToProperty("id"))); - ASSERT_TRUE(db.CreateIndex(db.NameToLabel("Person"), db.NameToProperty("unexisting_property"))); + ASSERT_FALSE(db.CreateIndex(db.NameToLabel("Person"), db.NameToProperty("id")).HasError()); + ASSERT_FALSE(db.CreateIndex(db.NameToLabel("Person"), db.NameToProperty("unexisting_property")).HasError()); const auto &db_initial_state = GetState(&db); memgraph::storage::Storage db_dump; @@ -852,19 +850,17 @@ TEST(DumpTest, MultiplePartialPulls) { memgraph::storage::Storage db; { // Create indices - db.CreateIndex(db.NameToLabel("PERSON"), db.NameToProperty("name")); - db.CreateIndex(db.NameToLabel("PERSON"), db.NameToProperty("surname")); + ASSERT_FALSE(db.CreateIndex(db.NameToLabel("PERSON"), db.NameToProperty("name")).HasError()); + ASSERT_FALSE(db.CreateIndex(db.NameToLabel("PERSON"), db.NameToProperty("surname")).HasError()); // Create existence constraints { auto res = db.CreateExistenceConstraint(db.NameToLabel("PERSON"), db.NameToProperty("name")); - ASSERT_TRUE(res.HasValue()); - ASSERT_TRUE(res.GetValue()); + ASSERT_FALSE(res.HasError()); } { auto res = db.CreateExistenceConstraint(db.NameToLabel("PERSON"), db.NameToProperty("surname")); - ASSERT_TRUE(res.HasValue()); - ASSERT_TRUE(res.GetValue()); + ASSERT_FALSE(res.HasError()); } // Create unique constraints diff --git a/tests/unit/query_plan_v2_create_set_remove_delete.cpp b/tests/unit/query_plan_v2_create_set_remove_delete.cpp index c1409daf8..db4ce11e0 100644 --- a/tests/unit/query_plan_v2_create_set_remove_delete.cpp +++ b/tests/unit/query_plan_v2_create_set_remove_delete.cpp @@ -105,7 +105,7 @@ TEST(QueryPlan, ScanAll) { TEST(QueryPlan, ScanAllByLabel) { memgraph::storage::Storage db; auto label = db.NameToLabel("label"); - ASSERT_TRUE(db.CreateIndex(label)); + ASSERT_FALSE(db.CreateIndex(label).HasError()); { auto dba = db.Access(); // Add some unlabeled vertices diff --git a/tests/unit/storage_v2_constraints.cpp b/tests/unit/storage_v2_constraints.cpp index 5c0fde426..aadc98ab0 100644 --- a/tests/unit/storage_v2_constraints.cpp +++ b/tests/unit/storage_v2_constraints.cpp @@ -11,6 +11,7 @@ #include #include +#include #include "storage/v2/storage.hpp" @@ -42,29 +43,29 @@ TEST_F(ConstraintsTest, ExistenceConstraintsCreateAndDrop) { EXPECT_EQ(storage.ListAllConstraints().existence.size(), 0); { auto res = storage.CreateExistenceConstraint(label1, prop1); - EXPECT_TRUE(res.HasValue() && res.GetValue()); + EXPECT_FALSE(res.HasError()); } EXPECT_THAT(storage.ListAllConstraints().existence, UnorderedElementsAre(std::make_pair(label1, prop1))); { auto res = storage.CreateExistenceConstraint(label1, prop1); - EXPECT_TRUE(res.HasValue() && !res.GetValue()); + EXPECT_TRUE(res.HasError()); } EXPECT_THAT(storage.ListAllConstraints().existence, UnorderedElementsAre(std::make_pair(label1, prop1))); { auto res = storage.CreateExistenceConstraint(label2, prop1); - EXPECT_TRUE(res.HasValue() && res.GetValue()); + EXPECT_FALSE(res.HasError()); } EXPECT_THAT(storage.ListAllConstraints().existence, UnorderedElementsAre(std::make_pair(label1, prop1), std::make_pair(label2, prop1))); - EXPECT_TRUE(storage.DropExistenceConstraint(label1, prop1)); - EXPECT_FALSE(storage.DropExistenceConstraint(label1, prop1)); + EXPECT_FALSE(storage.DropExistenceConstraint(label1, prop1).HasError()); + EXPECT_TRUE(storage.DropExistenceConstraint(label1, prop1).HasError()); EXPECT_THAT(storage.ListAllConstraints().existence, UnorderedElementsAre(std::make_pair(label2, prop1))); - EXPECT_TRUE(storage.DropExistenceConstraint(label2, prop1)); - EXPECT_FALSE(storage.DropExistenceConstraint(label2, prop2)); + EXPECT_FALSE(storage.DropExistenceConstraint(label2, prop1).HasError()); + EXPECT_TRUE(storage.DropExistenceConstraint(label2, prop2).HasError()); EXPECT_EQ(storage.ListAllConstraints().existence.size(), 0); { auto res = storage.CreateExistenceConstraint(label2, prop1); - EXPECT_TRUE(res.HasValue() && res.GetValue()); + EXPECT_FALSE(res.HasError()); } EXPECT_THAT(storage.ListAllConstraints().existence, UnorderedElementsAre(std::make_pair(label2, prop1))); } @@ -80,7 +81,7 @@ TEST_F(ConstraintsTest, ExistenceConstraintsCreateFailure1) { { auto res = storage.CreateExistenceConstraint(label1, prop1); ASSERT_TRUE(res.HasError()); - EXPECT_EQ(res.GetError(), + EXPECT_EQ(std::get(res.GetError()), (ConstraintViolation{ConstraintViolation::Type::EXISTENCE, label1, std::set{prop1}})); } { @@ -92,7 +93,7 @@ TEST_F(ConstraintsTest, ExistenceConstraintsCreateFailure1) { } { auto res = storage.CreateExistenceConstraint(label1, prop1); - EXPECT_TRUE(res.HasValue() && res.GetValue()); + EXPECT_FALSE(res.HasError()); } } @@ -107,7 +108,7 @@ TEST_F(ConstraintsTest, ExistenceConstraintsCreateFailure2) { { auto res = storage.CreateExistenceConstraint(label1, prop1); ASSERT_TRUE(res.HasError()); - EXPECT_EQ(res.GetError(), + EXPECT_EQ(std::get(res.GetError()), (ConstraintViolation{ConstraintViolation::Type::EXISTENCE, label1, std::set{prop1}})); } { @@ -119,7 +120,7 @@ TEST_F(ConstraintsTest, ExistenceConstraintsCreateFailure2) { } { auto res = storage.CreateExistenceConstraint(label1, prop1); - EXPECT_TRUE(res.HasValue() && res.GetValue()); + EXPECT_FALSE(res.HasError()); } } @@ -127,7 +128,7 @@ TEST_F(ConstraintsTest, ExistenceConstraintsCreateFailure2) { TEST_F(ConstraintsTest, ExistenceConstraintsViolationOnCommit) { { auto res = storage.CreateExistenceConstraint(label1, prop1); - ASSERT_TRUE(res.HasValue() && res.GetValue()); + EXPECT_FALSE(res.HasError()); } { @@ -137,7 +138,7 @@ TEST_F(ConstraintsTest, ExistenceConstraintsViolationOnCommit) { auto res = acc.Commit(); ASSERT_TRUE(res.HasError()); - EXPECT_EQ(res.GetError(), + EXPECT_EQ(std::get(res.GetError()), (ConstraintViolation{ConstraintViolation::Type::EXISTENCE, label1, std::set{prop1}})); } @@ -157,7 +158,7 @@ TEST_F(ConstraintsTest, ExistenceConstraintsViolationOnCommit) { auto res = acc.Commit(); ASSERT_TRUE(res.HasError()); - EXPECT_EQ(res.GetError(), + EXPECT_EQ(std::get(res.GetError()), (ConstraintViolation{ConstraintViolation::Type::EXISTENCE, label1, std::set{prop1}})); } @@ -173,7 +174,7 @@ TEST_F(ConstraintsTest, ExistenceConstraintsViolationOnCommit) { ASSERT_NO_ERROR(acc.Commit()); } - ASSERT_TRUE(storage.DropExistenceConstraint(label1, prop1)); + ASSERT_FALSE(storage.DropExistenceConstraint(label1, prop1).HasError()); { auto acc = storage.Access(); @@ -208,12 +209,12 @@ TEST_F(ConstraintsTest, UniqueConstraintsCreateAndDropAndList) { EXPECT_THAT(storage.ListAllConstraints().unique, UnorderedElementsAre(std::make_pair(label1, std::set{prop1}), std::make_pair(label2, std::set{prop1}))); - EXPECT_EQ(storage.DropUniqueConstraint(label1, {prop1}), UniqueConstraints::DeletionStatus::SUCCESS); - EXPECT_EQ(storage.DropUniqueConstraint(label1, {prop1}), UniqueConstraints::DeletionStatus::NOT_FOUND); + EXPECT_EQ(storage.DropUniqueConstraint(label1, {prop1}).GetValue(), UniqueConstraints::DeletionStatus::SUCCESS); + EXPECT_EQ(storage.DropUniqueConstraint(label1, {prop1}).GetValue(), UniqueConstraints::DeletionStatus::NOT_FOUND); EXPECT_THAT(storage.ListAllConstraints().unique, UnorderedElementsAre(std::make_pair(label2, std::set{prop1}))); - EXPECT_EQ(storage.DropUniqueConstraint(label2, {prop1}), UniqueConstraints::DeletionStatus::SUCCESS); - EXPECT_EQ(storage.DropUniqueConstraint(label2, {prop2}), UniqueConstraints::DeletionStatus::NOT_FOUND); + EXPECT_EQ(storage.DropUniqueConstraint(label2, {prop1}).GetValue(), UniqueConstraints::DeletionStatus::SUCCESS); + EXPECT_EQ(storage.DropUniqueConstraint(label2, {prop2}).GetValue(), UniqueConstraints::DeletionStatus::NOT_FOUND); EXPECT_EQ(storage.ListAllConstraints().unique.size(), 0); { auto res = storage.CreateUniqueConstraint(label2, {prop1}); @@ -239,7 +240,7 @@ TEST_F(ConstraintsTest, UniqueConstraintsCreateFailure1) { { auto res = storage.CreateUniqueConstraint(label1, {prop1}); ASSERT_TRUE(res.HasError()); - EXPECT_EQ(res.GetError(), + EXPECT_EQ(std::get(res.GetError()), (ConstraintViolation{ConstraintViolation::Type::UNIQUE, label1, std::set{prop1}})); } @@ -273,7 +274,7 @@ TEST_F(ConstraintsTest, UniqueConstraintsCreateFailure2) { { auto res = storage.CreateUniqueConstraint(label1, {prop1}); ASSERT_TRUE(res.HasError()); - EXPECT_EQ(res.GetError(), + EXPECT_EQ(std::get(res.GetError()), (ConstraintViolation{ConstraintViolation::Type::UNIQUE, label1, std::set{prop1}})); } @@ -458,7 +459,7 @@ TEST_F(ConstraintsTest, UniqueConstraintsViolationOnCommit1) { ASSERT_NO_ERROR(vertex2.SetProperty(prop1, PropertyValue(1))); auto res = acc.Commit(); ASSERT_TRUE(res.HasError()); - EXPECT_EQ(res.GetError(), + EXPECT_EQ(std::get(res.GetError()), (ConstraintViolation{ConstraintViolation::Type::UNIQUE, label1, std::set{prop1}})); } } @@ -500,7 +501,7 @@ TEST_F(ConstraintsTest, UniqueConstraintsViolationOnCommit2) { ASSERT_NO_ERROR(acc2.Commit()); auto res = acc3.Commit(); ASSERT_TRUE(res.HasError()); - EXPECT_EQ(res.GetError(), + EXPECT_EQ(std::get(res.GetError()), (ConstraintViolation{ConstraintViolation::Type::UNIQUE, label1, std::set{prop1}})); } } @@ -545,11 +546,11 @@ TEST_F(ConstraintsTest, UniqueConstraintsViolationOnCommit3) { auto res = acc2.Commit(); ASSERT_TRUE(res.HasError()); - EXPECT_EQ(res.GetError(), + EXPECT_EQ(std::get(res.GetError()), (ConstraintViolation{ConstraintViolation::Type::UNIQUE, label1, std::set{prop1}})); res = acc3.Commit(); ASSERT_TRUE(res.HasError()); - EXPECT_EQ(res.GetError(), + EXPECT_EQ(std::get(res.GetError()), (ConstraintViolation{ConstraintViolation::Type::UNIQUE, label1, std::set{prop1}})); } } @@ -620,7 +621,8 @@ TEST_F(ConstraintsTest, UniqueConstraintsLabelAlteration) { auto res = acc.Commit(); ASSERT_TRUE(res.HasError()); - EXPECT_EQ(res.GetError(), (ConstraintViolation{ConstraintViolation::Type::UNIQUE, label1, std::set{prop1}})); + EXPECT_EQ(std::get(res.GetError()), + (ConstraintViolation{ConstraintViolation::Type::UNIQUE, label1, std::set{prop1}})); } { @@ -654,7 +656,8 @@ TEST_F(ConstraintsTest, UniqueConstraintsLabelAlteration) { auto res = acc1.Commit(); ASSERT_TRUE(res.HasError()); - EXPECT_EQ(res.GetError(), (ConstraintViolation{ConstraintViolation::Type::UNIQUE, label1, std::set{prop1}})); + EXPECT_EQ(std::get(res.GetError()), + (ConstraintViolation{ConstraintViolation::Type::UNIQUE, label1, std::set{prop1}})); } } @@ -669,7 +672,7 @@ TEST_F(ConstraintsTest, UniqueConstraintsPropertySetSize) { } // Removing a constraint with empty property set should also fail. - ASSERT_EQ(storage.DropUniqueConstraint(label1, {}), UniqueConstraints::DeletionStatus::EMPTY_PROPERTIES); + ASSERT_EQ(storage.DropUniqueConstraint(label1, {}).GetValue(), UniqueConstraints::DeletionStatus::EMPTY_PROPERTIES); // Create a set of 33 properties. std::set properties; @@ -686,7 +689,7 @@ TEST_F(ConstraintsTest, UniqueConstraintsPropertySetSize) { } // An attempt to delete constraint with too large property set should fail. - ASSERT_EQ(storage.DropUniqueConstraint(label1, properties), + ASSERT_EQ(storage.DropUniqueConstraint(label1, properties).GetValue(), UniqueConstraints::DeletionStatus::PROPERTIES_SIZE_LIMIT_EXCEEDED); // Remove one property from the set. @@ -702,7 +705,7 @@ TEST_F(ConstraintsTest, UniqueConstraintsPropertySetSize) { EXPECT_THAT(storage.ListAllConstraints().unique, UnorderedElementsAre(std::make_pair(label1, properties))); // Removing a constraint with 32 properties should succeed. - ASSERT_EQ(storage.DropUniqueConstraint(label1, properties), UniqueConstraints::DeletionStatus::SUCCESS); + ASSERT_EQ(storage.DropUniqueConstraint(label1, properties).GetValue(), UniqueConstraints::DeletionStatus::SUCCESS); ASSERT_TRUE(storage.ListAllConstraints().unique.empty()); } @@ -749,7 +752,7 @@ TEST_F(ConstraintsTest, UniqueConstraintsMultipleProperties) { ASSERT_NO_ERROR(vertex2->SetProperty(prop2, PropertyValue(2))); auto res = acc.Commit(); ASSERT_TRUE(res.HasError()); - EXPECT_EQ(res.GetError(), + EXPECT_EQ(std::get(res.GetError()), (ConstraintViolation{ConstraintViolation::Type::UNIQUE, label1, std::set{prop1, prop2}})); } @@ -861,7 +864,8 @@ TEST_F(ConstraintsTest, UniqueConstraintsInsertRemoveAbortInsert) { auto res = acc.Commit(); ASSERT_TRUE(res.HasError()); - EXPECT_EQ(res.GetError(), (ConstraintViolation{ConstraintViolation::Type::UNIQUE, label1, std::set{prop1, prop2}})); + EXPECT_EQ(std::get(res.GetError()), + (ConstraintViolation{ConstraintViolation::Type::UNIQUE, label1, std::set{prop1, prop2}})); } } @@ -900,7 +904,8 @@ TEST_F(ConstraintsTest, UniqueConstraintsDeleteVertexSetProperty) { auto res = acc1.Commit(); ASSERT_TRUE(res.HasError()); - EXPECT_EQ(res.GetError(), (ConstraintViolation{ConstraintViolation::Type::UNIQUE, label1, std::set{prop1}})); + EXPECT_EQ(std::get(res.GetError()), + (ConstraintViolation{ConstraintViolation::Type::UNIQUE, label1, std::set{prop1}})); ASSERT_NO_ERROR(acc2.Commit()); } @@ -922,7 +927,8 @@ TEST_F(ConstraintsTest, UniqueConstraintsInsertDropInsert) { ASSERT_NO_ERROR(acc.Commit()); } - ASSERT_EQ(storage.DropUniqueConstraint(label1, {prop2, prop1}), UniqueConstraints::DeletionStatus::SUCCESS); + ASSERT_EQ(storage.DropUniqueConstraint(label1, {prop2, prop1}).GetValue(), + UniqueConstraints::DeletionStatus::SUCCESS); { auto acc = storage.Access(); diff --git a/tests/unit/storage_v2_durability.cpp b/tests/unit/storage_v2_durability.cpp index fb91ad159..86bb4f756 100644 --- a/tests/unit/storage_v2_durability.cpp +++ b/tests/unit/storage_v2_durability.cpp @@ -74,10 +74,10 @@ class DurabilityTest : public ::testing::TestWithParam { auto et2 = store->NameToEdgeType("base_et2"); // Create label index. - ASSERT_TRUE(store->CreateIndex(label_unindexed)); + ASSERT_FALSE(store->CreateIndex(label_unindexed).HasError()); // Create label+property index. - ASSERT_TRUE(store->CreateIndex(label_indexed, property_id)); + ASSERT_FALSE(store->CreateIndex(label_indexed, property_id).HasError()); // Create existence constraint. ASSERT_FALSE(store->CreateExistenceConstraint(label_unindexed, property_id).HasError()); @@ -138,10 +138,10 @@ class DurabilityTest : public ::testing::TestWithParam { auto et4 = store->NameToEdgeType("extended_et4"); // Create label index. - ASSERT_TRUE(store->CreateIndex(label_unused)); + ASSERT_FALSE(store->CreateIndex(label_unused).HasError()); // Create label+property index. - ASSERT_TRUE(store->CreateIndex(label_indexed, property_count)); + ASSERT_FALSE(store->CreateIndex(label_indexed, property_count).HasError()); // Create existence constraint. ASSERT_FALSE(store->CreateExistenceConstraint(label_unused, property_count).HasError()); @@ -1433,17 +1433,17 @@ TEST_P(DurabilityTest, WalCreateAndRemoveEverything) { CreateExtendedDataset(&store); auto indices = store.ListAllIndices(); for (const auto &index : indices.label) { - ASSERT_TRUE(store.DropIndex(index)); + ASSERT_FALSE(store.DropIndex(index).HasError()); } for (const auto &index : indices.label_property) { - ASSERT_TRUE(store.DropIndex(index.first, index.second)); + ASSERT_FALSE(store.DropIndex(index.first, index.second).HasError()); } auto constraints = store.ListAllConstraints(); for (const auto &constraint : constraints.existence) { - ASSERT_TRUE(store.DropExistenceConstraint(constraint.first, constraint.second)); + ASSERT_FALSE(store.DropExistenceConstraint(constraint.first, constraint.second).HasError()); } for (const auto &constraint : constraints.unique) { - ASSERT_EQ(store.DropUniqueConstraint(constraint.first, constraint.second), + ASSERT_EQ(store.DropUniqueConstraint(constraint.first, constraint.second).GetValue(), memgraph::storage::UniqueConstraints::DeletionStatus::SUCCESS); } auto acc = store.Access(); diff --git a/tests/unit/storage_v2_gc.cpp b/tests/unit/storage_v2_gc.cpp index 34b7b74aa..401e1a155 100644 --- a/tests/unit/storage_v2_gc.cpp +++ b/tests/unit/storage_v2_gc.cpp @@ -169,7 +169,7 @@ TEST(StorageV2Gc, Indices) { memgraph::storage::Storage storage(memgraph::storage::Config{ .gc = {.type = memgraph::storage::Config::Gc::Type::PERIODIC, .interval = std::chrono::milliseconds(100)}}); - ASSERT_TRUE(storage.CreateIndex(storage.NameToLabel("label"))); + ASSERT_FALSE(storage.CreateIndex(storage.NameToLabel("label")).HasError()); { auto acc0 = storage.Access(); diff --git a/tests/unit/storage_v2_indices.cpp b/tests/unit/storage_v2_indices.cpp index 894752bf7..eb05277f7 100644 --- a/tests/unit/storage_v2_indices.cpp +++ b/tests/unit/storage_v2_indices.cpp @@ -78,7 +78,7 @@ TEST_F(IndexTest, LabelIndexCreate) { ASSERT_NO_ERROR(acc.Commit()); } - EXPECT_TRUE(storage.CreateIndex(label1)); + EXPECT_FALSE(storage.CreateIndex(label1).HasError()); { auto acc = storage.Access(); @@ -163,7 +163,7 @@ TEST_F(IndexTest, LabelIndexDrop) { ASSERT_NO_ERROR(acc.Commit()); } - EXPECT_TRUE(storage.CreateIndex(label1)); + EXPECT_FALSE(storage.CreateIndex(label1).HasError()); { auto acc = storage.Access(); @@ -171,14 +171,14 @@ TEST_F(IndexTest, LabelIndexDrop) { EXPECT_THAT(GetIds(acc.Vertices(label1, View::NEW), View::NEW), UnorderedElementsAre(1, 3, 5, 7, 9)); } - EXPECT_TRUE(storage.DropIndex(label1)); + EXPECT_FALSE(storage.DropIndex(label1).HasError()); { auto acc = storage.Access(); EXPECT_FALSE(acc.LabelIndexExists(label1)); } EXPECT_EQ(storage.ListAllIndices().label.size(), 0); - EXPECT_FALSE(storage.DropIndex(label1)); + EXPECT_TRUE(storage.DropIndex(label1).HasError()); { auto acc = storage.Access(); EXPECT_FALSE(acc.LabelIndexExists(label1)); @@ -194,7 +194,7 @@ TEST_F(IndexTest, LabelIndexDrop) { ASSERT_NO_ERROR(acc.Commit()); } - EXPECT_TRUE(storage.CreateIndex(label1)); + EXPECT_FALSE(storage.CreateIndex(label1).HasError()); { auto acc = storage.Access(); EXPECT_TRUE(acc.LabelIndexExists(label1)); @@ -227,8 +227,8 @@ TEST_F(IndexTest, LabelIndexBasic) { // 3. Remove Label1 from odd numbered vertices, and add it to even numbered // vertices. // 4. Delete even numbered vertices. - EXPECT_TRUE(storage.CreateIndex(label1)); - EXPECT_TRUE(storage.CreateIndex(label2)); + EXPECT_FALSE(storage.CreateIndex(label1).HasError()); + EXPECT_FALSE(storage.CreateIndex(label2).HasError()); auto acc = storage.Access(); EXPECT_THAT(storage.ListAllIndices().label, UnorderedElementsAre(label1, label2)); @@ -292,8 +292,8 @@ TEST_F(IndexTest, LabelIndexDuplicateVersions) { // By removing labels and adding them again we create duplicate entries for // the same vertex in the index (they only differ by the timestamp). This test // checks that duplicates are properly filtered out. - EXPECT_TRUE(storage.CreateIndex(label1)); - EXPECT_TRUE(storage.CreateIndex(label2)); + EXPECT_FALSE(storage.CreateIndex(label1).HasError()); + EXPECT_FALSE(storage.CreateIndex(label2).HasError()); { auto acc = storage.Access(); @@ -329,8 +329,8 @@ TEST_F(IndexTest, LabelIndexDuplicateVersions) { // NOLINTNEXTLINE(hicpp-special-member-functions) TEST_F(IndexTest, LabelIndexTransactionalIsolation) { // Check that transactions only see entries they are supposed to see. - EXPECT_TRUE(storage.CreateIndex(label1)); - EXPECT_TRUE(storage.CreateIndex(label2)); + EXPECT_FALSE(storage.CreateIndex(label1).HasError()); + EXPECT_FALSE(storage.CreateIndex(label2).HasError()); auto acc_before = storage.Access(); auto acc = storage.Access(); @@ -356,8 +356,8 @@ TEST_F(IndexTest, LabelIndexTransactionalIsolation) { // NOLINTNEXTLINE(hicpp-special-member-functions) TEST_F(IndexTest, LabelIndexCountEstimate) { - EXPECT_TRUE(storage.CreateIndex(label1)); - EXPECT_TRUE(storage.CreateIndex(label2)); + EXPECT_FALSE(storage.CreateIndex(label1).HasError()); + EXPECT_FALSE(storage.CreateIndex(label2).HasError()); auto acc = storage.Access(); for (int i = 0; i < 20; ++i) { @@ -372,7 +372,7 @@ TEST_F(IndexTest, LabelIndexCountEstimate) { // NOLINTNEXTLINE(hicpp-special-member-functions) TEST_F(IndexTest, LabelPropertyIndexCreateAndDrop) { EXPECT_EQ(storage.ListAllIndices().label_property.size(), 0); - EXPECT_TRUE(storage.CreateIndex(label1, prop_id)); + EXPECT_FALSE(storage.CreateIndex(label1, prop_id).HasError()); { auto acc = storage.Access(); EXPECT_TRUE(acc.LabelPropertyIndexExists(label1, prop_id)); @@ -382,10 +382,10 @@ TEST_F(IndexTest, LabelPropertyIndexCreateAndDrop) { auto acc = storage.Access(); EXPECT_FALSE(acc.LabelPropertyIndexExists(label2, prop_id)); } - EXPECT_FALSE(storage.CreateIndex(label1, prop_id)); + EXPECT_TRUE(storage.CreateIndex(label1, prop_id).HasError()); EXPECT_THAT(storage.ListAllIndices().label_property, UnorderedElementsAre(std::make_pair(label1, prop_id))); - EXPECT_TRUE(storage.CreateIndex(label2, prop_id)); + EXPECT_FALSE(storage.CreateIndex(label2, prop_id).HasError()); { auto acc = storage.Access(); EXPECT_TRUE(acc.LabelPropertyIndexExists(label2, prop_id)); @@ -393,15 +393,15 @@ TEST_F(IndexTest, LabelPropertyIndexCreateAndDrop) { EXPECT_THAT(storage.ListAllIndices().label_property, UnorderedElementsAre(std::make_pair(label1, prop_id), std::make_pair(label2, prop_id))); - EXPECT_TRUE(storage.DropIndex(label1, prop_id)); + EXPECT_FALSE(storage.DropIndex(label1, prop_id).HasError()); { auto acc = storage.Access(); EXPECT_FALSE(acc.LabelPropertyIndexExists(label1, prop_id)); } EXPECT_THAT(storage.ListAllIndices().label_property, UnorderedElementsAre(std::make_pair(label2, prop_id))); - EXPECT_FALSE(storage.DropIndex(label1, prop_id)); + EXPECT_TRUE(storage.DropIndex(label1, prop_id).HasError()); - EXPECT_TRUE(storage.DropIndex(label2, prop_id)); + EXPECT_FALSE(storage.DropIndex(label2, prop_id).HasError()); { auto acc = storage.Access(); EXPECT_FALSE(acc.LabelPropertyIndexExists(label2, prop_id)); @@ -416,8 +416,8 @@ TEST_F(IndexTest, LabelPropertyIndexCreateAndDrop) { // NOLINTNEXTLINE(hicpp-special-member-functions) TEST_F(IndexTest, LabelPropertyIndexBasic) { - storage.CreateIndex(label1, prop_val); - storage.CreateIndex(label2, prop_val); + EXPECT_FALSE(storage.CreateIndex(label1, prop_val).HasError()); + EXPECT_FALSE(storage.CreateIndex(label2, prop_val).HasError()); auto acc = storage.Access(); EXPECT_THAT(GetIds(acc.Vertices(label1, prop_val, View::OLD), View::OLD), IsEmpty()); @@ -476,7 +476,7 @@ TEST_F(IndexTest, LabelPropertyIndexBasic) { // NOLINTNEXTLINE(hicpp-special-member-functions) TEST_F(IndexTest, LabelPropertyIndexDuplicateVersions) { - storage.CreateIndex(label1, prop_val); + EXPECT_FALSE(storage.CreateIndex(label1, prop_val).HasError()); { auto acc = storage.Access(); for (int i = 0; i < 5; ++i) { @@ -511,7 +511,7 @@ TEST_F(IndexTest, LabelPropertyIndexDuplicateVersions) { // NOLINTNEXTLINE(hicpp-special-member-functions) TEST_F(IndexTest, LabelPropertyIndexTransactionalIsolation) { - storage.CreateIndex(label1, prop_val); + EXPECT_FALSE(storage.CreateIndex(label1, prop_val).HasError()); auto acc_before = storage.Access(); auto acc = storage.Access(); @@ -545,7 +545,7 @@ TEST_F(IndexTest, LabelPropertyIndexFiltering) { // We also have a mix of doubles and integers to verify that they are sorted // properly. - storage.CreateIndex(label1, prop_val); + EXPECT_FALSE(storage.CreateIndex(label1, prop_val).HasError()); { auto acc = storage.Access(); @@ -603,7 +603,7 @@ TEST_F(IndexTest, LabelPropertyIndexFiltering) { // NOLINTNEXTLINE(hicpp-special-member-functions) TEST_F(IndexTest, LabelPropertyIndexCountEstimate) { - storage.CreateIndex(label1, prop_val); + EXPECT_FALSE(storage.CreateIndex(label1, prop_val).HasError()); auto acc = storage.Access(); for (int i = 1; i <= 10; ++i) { @@ -625,7 +625,7 @@ TEST_F(IndexTest, LabelPropertyIndexCountEstimate) { } TEST_F(IndexTest, LabelPropertyIndexMixedIteration) { - storage.CreateIndex(label1, prop_val); + EXPECT_FALSE(storage.CreateIndex(label1, prop_val).HasError()); const std::array temporals{TemporalData{TemporalType::Date, 23}, TemporalData{TemporalType::Date, 28}, TemporalData{TemporalType::LocalDateTime, 20}}; diff --git a/tests/unit/storage_v2_replication.cpp b/tests/unit/storage_v2_replication.cpp index e453ae17e..555b7ab27 100644 --- a/tests/unit/storage_v2_replication.cpp +++ b/tests/unit/storage_v2_replication.cpp @@ -210,8 +210,8 @@ TEST_F(ReplicationTest, BasicSynchronousReplicationTest) { const auto *property = "property"; const auto *property_extra = "property_extra"; { - ASSERT_TRUE(main_store.CreateIndex(main_store.NameToLabel(label))); - ASSERT_TRUE(main_store.CreateIndex(main_store.NameToLabel(label), main_store.NameToProperty(property))); + ASSERT_FALSE(main_store.CreateIndex(main_store.NameToLabel(label)).HasError()); + ASSERT_FALSE(main_store.CreateIndex(main_store.NameToLabel(label), main_store.NameToProperty(property)).HasError()); ASSERT_FALSE( main_store.CreateExistenceConstraint(main_store.NameToLabel(label), main_store.NameToProperty(property)) .HasError()); @@ -241,13 +241,15 @@ TEST_F(ReplicationTest, BasicSynchronousReplicationTest) { // existence constraint drop // unique constriant drop { - ASSERT_TRUE(main_store.DropIndex(main_store.NameToLabel(label))); - ASSERT_TRUE(main_store.DropIndex(main_store.NameToLabel(label), main_store.NameToProperty(property))); - ASSERT_TRUE(main_store.DropExistenceConstraint(main_store.NameToLabel(label), main_store.NameToProperty(property))); - ASSERT_EQ( - main_store.DropUniqueConstraint(main_store.NameToLabel(label), {main_store.NameToProperty(property), - main_store.NameToProperty(property_extra)}), - memgraph::storage::UniqueConstraints::DeletionStatus::SUCCESS); + ASSERT_FALSE(main_store.DropIndex(main_store.NameToLabel(label)).HasError()); + ASSERT_FALSE(main_store.DropIndex(main_store.NameToLabel(label), main_store.NameToProperty(property)).HasError()); + ASSERT_FALSE(main_store.DropExistenceConstraint(main_store.NameToLabel(label), main_store.NameToProperty(property)) + .HasError()); + ASSERT_EQ(main_store + .DropUniqueConstraint(main_store.NameToLabel(label), {main_store.NameToProperty(property), + main_store.NameToProperty(property_extra)}) + .GetValue(), + memgraph::storage::UniqueConstraints::DeletionStatus::SUCCESS); } {