From 87423eed06f549394e857a3ea72b9eee39719bc7 Mon Sep 17 00:00:00 2001 From: Matija Santl <matija.santl@memgraph.com> Date: Fri, 24 May 2019 13:26:24 +0200 Subject: [PATCH] Add constraint in HA Summary: HA should now support constraints in the same way the SM version does. I only tested this thing manually, but I plan to add a new integration test for this also. Reviewers: ipaljak, vkasljevic, mferencevic Reviewed By: ipaljak, mferencevic Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D2083 --- src/CMakeLists.txt | 2 + .../single_node_ha/graph_db_accessor.cpp | 130 +++++++++++++++++- .../single_node_ha/graph_db_accessor.hpp | 78 ++++++++++- src/durability/single_node_ha/state_delta.cpp | 99 +++++++++++++ src/durability/single_node_ha/state_delta.lcp | 15 +- src/query/interpreter.cpp | 4 +- .../single_node_ha/record_accessor.cpp | 13 +- .../single_node_ha/record_accessor.hpp | 8 +- src/storage/single_node_ha/storage.hpp | 4 + src/storage/single_node_ha/storage_gc.hpp | 2 +- .../single_node_ha/vertex_accessor.cpp | 3 +- tests/integration/CMakeLists.txt | 3 + tests/integration/apollo_runs.yaml | 10 ++ .../integration/ha/constraints/CMakeLists.txt | 6 + tests/integration/ha/constraints/raft.json | 7 + tests/integration/ha/constraints/runner.py | 122 ++++++++++++++++ tests/integration/ha/constraints/tester.cpp | 99 +++++++++++++ 17 files changed, 578 insertions(+), 27 deletions(-) create mode 100644 tests/integration/ha/constraints/CMakeLists.txt create mode 100644 tests/integration/ha/constraints/raft.json create mode 100755 tests/integration/ha/constraints/runner.py create mode 100644 tests/integration/ha/constraints/tester.cpp diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 6ea1aa26f..e2421b000 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -271,6 +271,8 @@ set(mg_single_node_ha_sources query/plan/variable_start_planner.cpp query/repl.cpp query/typed_value.cpp + storage/common/constraints/record.cpp + storage/common/constraints/unique_constraints.cpp storage/common/types/property_value.cpp storage/common/types/slk.cpp storage/common/types/property_value_store.cpp diff --git a/src/database/single_node_ha/graph_db_accessor.cpp b/src/database/single_node_ha/graph_db_accessor.cpp index a93312b9a..3c077f435 100644 --- a/src/database/single_node_ha/graph_db_accessor.cpp +++ b/src/database/single_node_ha/graph_db_accessor.cpp @@ -198,24 +198,142 @@ void GraphDbAccessor::DeleteIndex(storage::Label label, } } -void GraphDbAccessor::UpdateLabelIndices(storage::Label label, - const VertexAccessor &vertex_accessor, - const Vertex *const vertex) { +void GraphDbAccessor::BuildUniqueConstraint( + storage::Label label, const std::vector<storage::Property> &properties) { + DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted"; + + storage::constraints::ConstraintEntry entry{label, properties}; + if (!db_->storage().unique_constraints_.AddConstraint(entry)) { + // Already exists + return; + } + + try { + auto dba = db_->AccessBlocking(std::make_optional(transaction().id_)); + + for (auto v : dba.Vertices(false)) { + if (std::find(v.labels().begin(), v.labels().end(), label) != + v.labels().end()) { + db_->storage().unique_constraints_.Update(v, dba.transaction()); + } + } + + std::vector<std::string> property_names(properties.size()); + std::transform(properties.begin(), properties.end(), property_names.begin(), + [&dba](storage::Property property) { + return dba.PropertyName(property); + }); + + dba.raft()->Emplace(database::StateDelta::BuildUniqueConstraint( + dba.transaction().id_, label, dba.LabelName(label), properties, + property_names)); + + dba.Commit(); + + } catch (const tx::TransactionEngineError &e) { + db_->storage().unique_constraints_.RemoveConstraint(entry); + throw TransactionException(e.what()); + } catch (const storage::constraints::ViolationException &e) { + db_->storage().unique_constraints_.RemoveConstraint(entry); + throw ConstraintViolationException(e.what()); + } catch (const storage::constraints::SerializationException &e) { + db_->storage().unique_constraints_.RemoveConstraint(entry); + throw mvcc::SerializationError(); + } catch (...) { + db_->storage().unique_constraints_.RemoveConstraint(entry); + throw; + } +} + +void GraphDbAccessor::DeleteUniqueConstraint( + storage::Label label, const std::vector<storage::Property> &properties) { + storage::constraints::ConstraintEntry entry{label, properties}; + try { + auto dba = db_->AccessBlocking(std::make_optional(transaction().id_)); + + if (!db_->storage().unique_constraints_.RemoveConstraint(entry)) { + // Nothing was deleted + return; + } + + std::vector<std::string> property_names(properties.size()); + std::transform(properties.begin(), properties.end(), property_names.begin(), + [&dba](storage::Property property) { + return dba.PropertyName(property); + }); + + dba.raft()->Emplace(database::StateDelta::DropUniqueConstraint( + dba.transaction().id_, label, dba.LabelName(label), properties, + property_names)); + + dba.Commit(); + } catch (const tx::TransactionEngineError &e) { + throw TransactionException(e.what()); + } +} + +std::vector<storage::constraints::ConstraintEntry> +GraphDbAccessor::ListUniqueConstraints() const { + return db_->storage().unique_constraints_.ListConstraints(); +} + +void GraphDbAccessor::UpdateOnAddLabel(storage::Label label, + const VertexAccessor &vertex_accessor, + const Vertex *vertex) { DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted"; auto *vlist_ptr = vertex_accessor.address(); + try { + db_->storage().unique_constraints_.UpdateOnAddLabel(label, vertex_accessor, + transaction()); + } catch (const storage::constraints::SerializationException &e) { + throw mvcc::SerializationError(); + } catch (const storage::constraints::ViolationException &e) { + throw ConstraintViolationException(e.what()); + } + db_->storage().label_property_index_.UpdateOnLabel(label, vlist_ptr, vertex); db_->storage().labels_index_.Update(label, vlist_ptr, vertex); } -void GraphDbAccessor::UpdatePropertyIndex( - storage::Property property, const RecordAccessor<Vertex> &vertex_accessor, - const Vertex *const vertex) { +void GraphDbAccessor::UpdateOnRemoveLabel( + storage::Label label, const RecordAccessor<Vertex> &accessor) { + db_->storage().unique_constraints_.UpdateOnRemoveLabel(label, accessor, + transaction()); +} + +void GraphDbAccessor::UpdateOnAddProperty( + storage::Property property, const PropertyValue &previous_value, + const PropertyValue &new_value, + const RecordAccessor<Vertex> &vertex_accessor, const Vertex *vertex) { DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted"; + + try { + db_->storage().unique_constraints_.UpdateOnAddProperty( + property, previous_value, new_value, vertex_accessor, transaction()); + } catch (const storage::constraints::SerializationException &e) { + throw mvcc::SerializationError(); + } catch (const storage::constraints::ViolationException &e) { + throw ConstraintViolationException(e.what()); + } + db_->storage().label_property_index_.UpdateOnProperty( property, vertex_accessor.address(), vertex); } +void GraphDbAccessor::UpdateOnRemoveProperty( + storage::Property property, const PropertyValue &previous_value, + const RecordAccessor<Vertex> &accessor, const Vertex *vertex) { + DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted"; + + try { + db_->storage().unique_constraints_.UpdateOnRemoveProperty( + property, previous_value, accessor, transaction()); + } catch (const storage::constraints::SerializationException &e) { + throw mvcc::SerializationError(); + } +} + int64_t GraphDbAccessor::VerticesCount() const { DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted"; return db_->storage().vertices_.access().size(); diff --git a/src/database/single_node_ha/graph_db_accessor.hpp b/src/database/single_node_ha/graph_db_accessor.hpp index a0014cb2f..2d52e7023 100644 --- a/src/database/single_node_ha/graph_db_accessor.hpp +++ b/src/database/single_node_ha/graph_db_accessor.hpp @@ -13,6 +13,7 @@ #include "database/single_node_ha/graph_db.hpp" #include "raft/raft_interface.hpp" +#include "storage/common/constraints/exceptions.hpp" #include "storage/common/types/types.hpp" #include "storage/single_node_ha/edge_accessor.hpp" #include "storage/single_node_ha/vertex_accessor.hpp" @@ -432,6 +433,32 @@ class GraphDbAccessor { /// Writes Index (key) creation to Raft, marks it as ready for usage void EnableIndex(const LabelPropertyIndex::Key &key); + /** + * Creates new unique constraint that consists of a label and multiple + * properties. + * If the constraint already exists, this method does nothing. + * + * @throws ConstraintViolationException if constraint couldn't be build + * due to existing constraint violation. + * @throws TransactionEngineError if the engine doesn't accept transactions. + * @throws mvcc::SerializationError on serialization errors. + */ + void BuildUniqueConstraint(storage::Label label, + const std::vector<storage::Property> &properties); + + /** + * Deletes existing unique constraint. + * If the constraint doesn't exist, this method does nothing. + */ + void DeleteUniqueConstraint(storage::Label label, + const std::vector<storage::Property> &properties); + + /** + * Returns a list of currently active unique constraints. + */ + std::vector<storage::constraints::ConstraintEntry> ListUniqueConstraints() + const; + /** * @brief - Returns true if the given label+property index already exists and * is ready for use. @@ -608,15 +635,52 @@ class GraphDbAccessor { bool aborted_{false}; /** - * Insert this vertex into corresponding any label + 'property' index. - * @param property - vertex will be inserted into indexes which contain this - * property - * @param vertex_accessor - vertex accessor to insert - * @param vertex - vertex to insert + * Notifies storage about label addition. + * + * @param label - label that was added + * @param vertex_accessor - vertex_accessor that was updated + * @param vertex - vertex that was updated */ - void UpdatePropertyIndex(storage::Property property, + void UpdateOnAddLabel(storage::Label label, + const VertexAccessor &vertex_accessor, + const Vertex *vertex); + + /** + * Notifies storage about label removal. + * + * @param label - label that was removed + * @param vertex_accessor - vertex_accessor that was updated + */ + void UpdateOnRemoveLabel(storage::Label label, + const RecordAccessor<Vertex> &accessor); + + /** + * Notifies storage about a property removal. + * + * @param property - property that was removed + * @param previous_value - previous value of the property + * @param vertex_accessor - vertex_accessor that was updated + * @param vertex - vertex that was updated + */ + void UpdateOnRemoveProperty(storage::Property property, + const PropertyValue &previous_value, + const RecordAccessor<Vertex> &accessor, + const Vertex *vertex); + + /** + * Notifies storage about a property addition. + * + * @param property - property that was added + * @param previous_value - previous value of the property + * @param new_value - new value of the property + * @param vertex_accessor - vertex accessor that was updated + * @param vertex - vertex that was updated + */ + void UpdateOnAddProperty(storage::Property property, + const PropertyValue &previous_value, + const PropertyValue &new_value, const RecordAccessor<Vertex> &vertex_accessor, - const Vertex *const vertex); + const Vertex *vertex); }; } // namespace database diff --git a/src/durability/single_node_ha/state_delta.cpp b/src/durability/single_node_ha/state_delta.cpp index 1019a3c05..a5f87bb2c 100644 --- a/src/durability/single_node_ha/state_delta.cpp +++ b/src/durability/single_node_ha/state_delta.cpp @@ -129,6 +129,32 @@ StateDelta StateDelta::NoOp(tx::TransactionId tx_id) { return op; } +StateDelta StateDelta::BuildUniqueConstraint( + tx::TransactionId tx_id, storage::Label label, + const std::string &label_name, + const std::vector<storage::Property> &properties, + const std::vector<std::string> &property_names) { + StateDelta op(StateDelta::Type::BUILD_UNIQUE_CONSTRAINT, tx_id); + op.label = label; + op.label_name = label_name; + op.properties = properties; + op.property_names = property_names; + return op; +} + +StateDelta StateDelta::DropUniqueConstraint( + tx::TransactionId tx_id, storage::Label label, + const std::string &label_name, + const std::vector<storage::Property> &properties, + const std::vector<std::string> &property_names) { + StateDelta op(StateDelta::Type::DROP_UNIQUE_CONSTRAINT, tx_id); + op.label = label; + op.label_name = label_name; + op.properties = properties; + op.property_names = property_names; + return op; +} + void StateDelta::Encode( HashedFileWriter &writer, communication::bolt::BaseEncoder<HashedFileWriter> &encoder) const { @@ -187,6 +213,28 @@ void StateDelta::Encode( encoder.WriteInt(property.Id()); encoder.WriteString(property_name); break; + case Type::BUILD_UNIQUE_CONSTRAINT: + encoder.WriteInt(label.Id()); + encoder.WriteString(label_name); + encoder.WriteInt(properties.size()); + for (auto prop : properties) { + encoder.WriteInt(prop.Id()); + } + for (auto &name : property_names) { + encoder.WriteString(name); + } + break; + case Type::DROP_UNIQUE_CONSTRAINT: + encoder.WriteInt(label.Id()); + encoder.WriteString(label_name); + encoder.WriteInt(properties.size()); + for (auto prop : properties) { + encoder.WriteInt(prop.Id()); + } + for (auto &name : property_names) { + encoder.WriteString(name); + } + break; } writer.WriteValue(writer.hash()); @@ -268,6 +316,38 @@ std::optional<StateDelta> StateDelta::Decode( DECODE_MEMBER_CAST(property, ValueInt, storage::Property) DECODE_MEMBER(property_name, ValueString) break; + case Type::BUILD_UNIQUE_CONSTRAINT: { + DECODE_MEMBER_CAST(label, ValueInt, storage::Label) + DECODE_MEMBER(label_name, ValueString) + if (!decoder.ReadValue(&dv)) return nullopt; + int size = dv.ValueInt(); + for (size_t i = 0; i < size; ++i) { + if (!decoder.ReadValue(&dv)) return nullopt; + r_val.properties.push_back( + static_cast<storage::Property>(dv.ValueInt())); + } + for (size_t i = 0; i < size; ++i) { + if (!decoder.ReadValue(&dv)) return nullopt; + r_val.property_names.push_back(dv.ValueString()); + } + break; + } + case Type::DROP_UNIQUE_CONSTRAINT: { + DECODE_MEMBER_CAST(label, ValueInt, storage::Label) + DECODE_MEMBER(label_name, ValueString) + if (!decoder.ReadValue(&dv)) return nullopt; + int size = dv.ValueInt(); + for (size_t i = 0; i < size; ++i) { + if (!decoder.ReadValue(&dv)) return nullopt; + r_val.properties.push_back( + static_cast<storage::Property>(dv.ValueInt())); + } + for (size_t i = 0; i < size; ++i) { + if (!decoder.ReadValue(&dv)) return nullopt; + r_val.property_names.push_back(dv.ValueString()); + } + break; + } } auto decoder_hash = reader.hash(); @@ -342,6 +422,25 @@ void StateDelta::Apply(GraphDbAccessor &dba) const { } case Type::NO_OP: break; + case Type::BUILD_UNIQUE_CONSTRAINT: { + std::vector<storage::Property> properties; + properties.reserve(property_names.size()); + for (auto &p : property_names) { + properties.push_back(dba.Property(p)); + } + + dba.BuildUniqueConstraint(dba.Label(label_name), properties); + } break; + case Type::DROP_UNIQUE_CONSTRAINT: { + std::vector<storage::Property> properties; + properties.reserve(property_names.size()); + for (auto &p : property_names) { + properties.push_back(dba.Property(p)); + } + + dba.DeleteUniqueConstraint(dba.Label(label_name), properties); + } break; + } } diff --git a/src/durability/single_node_ha/state_delta.lcp b/src/durability/single_node_ha/state_delta.lcp index bd204aead..ac47b46c1 100644 --- a/src/durability/single_node_ha/state_delta.lcp +++ b/src/durability/single_node_ha/state_delta.lcp @@ -35,6 +35,8 @@ cpp<# (edge-type-name "std::string") (property "storage::Property") (property-name "std::string") + (properties "std::vector<storage::Property>") + (property-names "std::vector<std::string>") (value "PropertyValue" :initval "PropertyValue::Null") (label "storage::Label") (label-name "std::string") @@ -67,6 +69,8 @@ in StateDeltas.") build-index ;; label, label_name, property, property_name drop-index ;; label, label_name, property, property_name no-op ;; no-op state delta required by Raft protocol + build-unique_constraint ;; label, label_name, properties, property_names + drop-unique_constraint ;; label, label_name, properties, property_names ) (:documentation "Defines StateDelta type. For each type the comment indicates which values @@ -126,8 +130,17 @@ omitted in the comment.") const std::string &label_name, storage::Property property, const std::string &property_name); - static StateDelta NoOp(tx::TransactionId tx_id); + static StateDelta BuildUniqueConstraint( + tx::TransactionId tx_id, storage::Label label, + const std::string &label_name, + const std::vector<storage::Property> &properties, + const std::vector<std::string> &property_names); + static StateDelta DropUniqueConstraint( + tx::TransactionId tx_id, storage::Label label, + const std::string &label_name, + const std::vector<storage::Property> &property, + const std::vector<std::string> &property_names); /// Applies CRUD delta to database accessor. Fails on other types of deltas void Apply(GraphDbAccessor &dba) const; diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index 17b21a1f1..f066de9f0 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -624,7 +624,7 @@ Callback HandleInfoQuery(InfoQuery *info_query, }; break; case InfoQuery::InfoType::CONSTRAINT: -#ifdef MG_SINGLE_NODE +#if defined(MG_SINGLE_NODE) || defined(MG_SINGLE_NODE_HA) callback.header = {"constraint type", "label", "properties"}; callback.fn = [db_accessor] { std::vector<std::vector<TypedValue>> results; @@ -669,7 +669,7 @@ Callback HandleInfoQuery(InfoQuery *info_query, Callback HandleConstraintQuery(ConstraintQuery *constraint_query, database::GraphDbAccessor *db_accessor) { -#ifdef MG_SINGLE_NODE +#if defined(MG_SINGLE_NODE) || defined(MG_SINGLE_NODE_HA) std::vector<storage::Property> properties; auto label = db_accessor->Label(constraint_query->constraint_.label.name); properties.reserve(constraint_query->constraint_.properties.size()); diff --git a/src/storage/single_node_ha/record_accessor.cpp b/src/storage/single_node_ha/record_accessor.cpp index 40731da01..48dc65518 100644 --- a/src/storage/single_node_ha/record_accessor.cpp +++ b/src/storage/single_node_ha/record_accessor.cpp @@ -25,9 +25,10 @@ void RecordAccessor<Vertex>::PropsSet(storage::Property key, auto &dba = db_accessor(); auto delta = StateDelta::PropsSetVertex(dba.transaction_id(), gid(), key, dba.PropertyName(key), value); + auto previous_value = PropsAt(key); update().properties_.set(key, value); - dba.UpdatePropertyIndex(key, *this, &update()); - db_accessor().raft()->Emplace(delta); + dba.UpdateOnAddProperty(key, previous_value, value, *this, &update()); + dba.raft()->Emplace(delta); } template <> @@ -38,7 +39,7 @@ void RecordAccessor<Edge>::PropsSet(storage::Property key, dba.PropertyName(key), value); update().properties_.set(key, value); - db_accessor().raft()->Emplace(delta); + dba.raft()->Emplace(delta); } template <> @@ -47,8 +48,10 @@ void RecordAccessor<Vertex>::PropsErase(storage::Property key) { auto delta = StateDelta::PropsSetVertex(dba.transaction_id(), gid(), key, dba.PropertyName(key), PropertyValue::Null); + auto previous_value = PropsAt(key); update().properties_.set(key, PropertyValue::Null); - db_accessor().raft()->Emplace(delta); + dba.UpdateOnRemoveProperty(key, previous_value, *this, &update()); + dba.raft()->Emplace(delta); } template <> @@ -58,7 +61,7 @@ void RecordAccessor<Edge>::PropsErase(storage::Property key) { StateDelta::PropsSetEdge(dba.transaction_id(), gid(), key, dba.PropertyName(key), PropertyValue::Null); update().properties_.set(key, PropertyValue::Null); - db_accessor().raft()->Emplace(delta); + dba.raft()->Emplace(delta); } template <typename TRecord> diff --git a/src/storage/single_node_ha/record_accessor.hpp b/src/storage/single_node_ha/record_accessor.hpp index d11c2e319..29e46ebdb 100644 --- a/src/storage/single_node_ha/record_accessor.hpp +++ b/src/storage/single_node_ha/record_accessor.hpp @@ -149,6 +149,10 @@ class RecordAccessor { */ int64_t CypherId() const; + /** Returns the current version (either new_ or old_) set on this + * RecordAccessor. */ + const TRecord ¤t() const; + protected: /** * Pointer to the version (either old_ or new_) that READ operations @@ -160,10 +164,6 @@ class RecordAccessor { */ mutable TRecord *current_{nullptr}; - /** Returns the current version (either new_ or old_) set on this - * RecordAccessor. */ - const TRecord ¤t() const; - private: // The database accessor for which this record accessor is created // Provides means of getting to the transaction and database functions. diff --git a/src/storage/single_node_ha/storage.hpp b/src/storage/single_node_ha/storage.hpp index ac9a56400..99c3fb9cd 100644 --- a/src/storage/single_node_ha/storage.hpp +++ b/src/storage/single_node_ha/storage.hpp @@ -4,6 +4,7 @@ #include <optional> #include "data_structures/concurrent/concurrent_map.hpp" +#include "storage/common/constraints/unique_constraints.hpp" #include "storage/common/kvstore/kvstore.hpp" #include "storage/common/types/types.hpp" #include "storage/single_node_ha/edge.hpp" @@ -75,6 +76,9 @@ class Storage { KeyIndex<storage::Label, Vertex> labels_index_; LabelPropertyIndex label_property_index_; + // unique constraints + storage::constraints::UniqueConstraints unique_constraints_; + std::vector<std::string> properties_on_disk_; /// Gets the Vertex/Edge main storage map. diff --git a/src/storage/single_node_ha/storage_gc.hpp b/src/storage/single_node_ha/storage_gc.hpp index d75eae987..bf3f6b4f0 100644 --- a/src/storage/single_node_ha/storage_gc.hpp +++ b/src/storage/single_node_ha/storage_gc.hpp @@ -96,7 +96,7 @@ class StorageGc { vertices_.gc_.Run(snapshot_gc, tx_engine_); edges_.gc_.Run(snapshot_gc, tx_engine_); - + storage_.unique_constraints_.Refresh(snapshot_gc, tx_engine_); VLOG(21) << "Garbage collector mvcc phase time: " << x.Elapsed().count(); } // This has to be run sequentially after gc because gc modifies diff --git a/src/storage/single_node_ha/vertex_accessor.cpp b/src/storage/single_node_ha/vertex_accessor.cpp index 63ce36f80..6d304d43e 100644 --- a/src/storage/single_node_ha/vertex_accessor.cpp +++ b/src/storage/single_node_ha/vertex_accessor.cpp @@ -25,7 +25,7 @@ void VertexAccessor::add_label(storage::Label label) { if (!utils::Contains(vertex.labels_, label)) { vertex.labels_.emplace_back(label); dba.raft()->Emplace(delta); - dba.UpdateLabelIndices(label, *this, &vertex); + dba.UpdateOnAddLabel(label, *this, &vertex); } } @@ -40,6 +40,7 @@ void VertexAccessor::remove_label(storage::Label label) { std::swap(*found, labels.back()); labels.pop_back(); dba.raft()->Emplace(delta); + dba.UpdateOnRemoveLabel(label, *this); } } diff --git a/tests/integration/CMakeLists.txt b/tests/integration/CMakeLists.txt index 377679694..0a7dee3e0 100644 --- a/tests/integration/CMakeLists.txt +++ b/tests/integration/CMakeLists.txt @@ -19,6 +19,9 @@ add_subdirectory(distributed) # distributed ha/basic binaries add_subdirectory(ha/basic) +# distributed ha/constraints binaries +add_subdirectory(ha/constraints) + # distributed ha/index binaries add_subdirectory(ha/index) diff --git a/tests/integration/apollo_runs.yaml b/tests/integration/apollo_runs.yaml index a97714609..34ea19673 100644 --- a/tests/integration/apollo_runs.yaml +++ b/tests/integration/apollo_runs.yaml @@ -82,6 +82,16 @@ - ../../../../build_debug/memgraph_ha # memgraph ha binary - ../../../../build_debug/tests/integration/ha/basic/tester # tester binary +- name: integration__ha_constraints + cd: ha/constraints + commands: ./runner.py + infiles: + - runner.py # runner script + - raft.json # raft configuration + - ../ha_test.py # raft test base module + - ../../../../build_debug/memgraph_ha # memgraph ha binary + - ../../../../build_debug/tests/integration/ha/constraints/tester # tester binary + - name: integration__ha_index cd: ha/index commands: ./runner.py diff --git a/tests/integration/ha/constraints/CMakeLists.txt b/tests/integration/ha/constraints/CMakeLists.txt new file mode 100644 index 000000000..270e22861 --- /dev/null +++ b/tests/integration/ha/constraints/CMakeLists.txt @@ -0,0 +1,6 @@ +set(target_name memgraph__integration__ha_constraints) +set(tester_target_name ${target_name}__tester) + +add_executable(${tester_target_name} tester.cpp) +set_target_properties(${tester_target_name} PROPERTIES OUTPUT_NAME tester) +target_link_libraries(${tester_target_name} mg-utils mg-communication) diff --git a/tests/integration/ha/constraints/raft.json b/tests/integration/ha/constraints/raft.json new file mode 100644 index 000000000..93a21c5b7 --- /dev/null +++ b/tests/integration/ha/constraints/raft.json @@ -0,0 +1,7 @@ +{ + "election_timeout_min": 200, + "election_timeout_max": 500, + "heartbeat_interval": 100, + "replication_timeout": 10000, + "log_size_snapshot_threshold": -1 +} diff --git a/tests/integration/ha/constraints/runner.py b/tests/integration/ha/constraints/runner.py new file mode 100755 index 000000000..fcd57f6cc --- /dev/null +++ b/tests/integration/ha/constraints/runner.py @@ -0,0 +1,122 @@ +#!/usr/bin/python3 + +import argparse +import os +import time +import random +import subprocess +import sys + +SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) +PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", "..", "..")) + +# append parent directory +sys.path.append(os.path.join(SCRIPT_DIR, "..")) + +from ha_test import HaTestBase + + +class HaIndexTest(HaTestBase): + def execute_step(self, step, property_value=None, expected_status=None, + expected_result=None): + if step == "create": + print("Executing create query") + client = subprocess.Popen([self.tester_binary, "--step", "create", + "--cluster_size", str(self.cluster_size)]) + + elif step == "drop": + print("Executing drop query") + client = subprocess.Popen([self.tester_binary, "--step", "drop", + "--cluster_size", str(self.cluster_size)]) + elif step == "add_node": + print("Executing add_node query ") + client = subprocess.Popen([self.tester_binary, "--step", "add_node", + "--cluster_size", str(self.cluster_size), "--expected_status", + str(expected_status), "--property_value", str(property_value)]) + + elif step == "check": + print("Executing check query") + client = subprocess.Popen([self.tester_binary, "--step", "check", + "--cluster_size", str(self.cluster_size), "--expected_result", + str(expected_result)]) + else: + raise ValueError("Invalid step argument: " + step) + + # Check what happened with query execution. + try: + code = client.wait(timeout=30) + except subprocess.TimeoutExpired as e: + print("HA client timed out!") + client.kill() + return 1 + + return code + + + def execute(self): + self.start_cluster() + num_nodes = 1 + + assert self.execute_step("add_node", expected_status=0, \ + property_value=num_nodes) == 0, \ + "Error while executing add_node query" + + assert self.execute_step("create") == 0, \ + "Error while executing create query" + + assert self.execute_step("check", expected_result=num_nodes) == 0, \ + "Error while executing check query" + + for i in range(self.cluster_size): + # Kill worker. + print("Killing worker {}".format(i)) + self.kill_worker(i) + + assert self.execute_step("add_node", expected_status=1, \ + property_value=num_nodes) == 0, \ + "Error while executing add_node query" + + assert self.execute_step("add_node", expected_status=0, \ + property_value=num_nodes + 1) == 0, \ + "Error while executing add_node query" + + num_nodes += 1 + + # Bring worker back to life. + print("Starting worker {}".format(i)) + self.start_worker(i) + + assert self.execute_step("drop") == 0, \ + "Error while executing drop query" + + assert self.execute_step("check", expected_result=num_nodes) == 0, \ + "Error while executing check query" + + +def find_correct_path(path): + f = os.path.join(PROJECT_DIR, "build", path) + if not os.path.exists(f): + f = os.path.join(PROJECT_DIR, "build_debug", path) + return f + + +if __name__ == "__main__": + memgraph_binary = find_correct_path("memgraph_ha") + tester_binary = find_correct_path(os.path.join("tests", "integration", "ha", + "constraints", "tester")) + + raft_config_file = os.path.join(PROJECT_DIR, "tests", "integration", "ha", + "constraints", "raft.json") + + parser = argparse.ArgumentParser() + parser.add_argument("--memgraph", default=memgraph_binary) + parser.add_argument("--raft_config_file", default=raft_config_file) + args = parser.parse_args() + + for cluster_size in [3, 5]: + print("\033[1;36m~~ Executing test with cluster size: %d~~\033[0m" % (cluster_size)) + HaIndexTest( + args.memgraph, tester_binary, args.raft_config_file, cluster_size) + print("\033[1;32m~~ The test finished successfully ~~\033[0m") + + sys.exit(0) diff --git a/tests/integration/ha/constraints/tester.cpp b/tests/integration/ha/constraints/tester.cpp new file mode 100644 index 000000000..c12fec6a0 --- /dev/null +++ b/tests/integration/ha/constraints/tester.cpp @@ -0,0 +1,99 @@ +#include <chrono> +#include <thread> +#include <vector> + +#include <fmt/format.h> +#include <gflags/gflags.h> +#include <glog/logging.h> + +#include "communication/bolt/ha_client.hpp" +#include "io/network/endpoint.hpp" +#include "io/network/utils.hpp" +#include "utils/timer.hpp" + +DEFINE_string(address, "127.0.0.1", "Server address"); +DEFINE_int32(port, 7687, "Server port"); +DEFINE_int32(cluster_size, 3, "Size of the raft cluster."); +DEFINE_int32(num_retries, 20, "Number of (leader) execution retries."); +DEFINE_string(username, "", "Username for the database"); +DEFINE_string(password, "", "Password for the database"); +DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server."); + +DEFINE_string(step, "", + "The step to execute (available: create, check, add_node, drop"); +DEFINE_int32(property_value, 0, "Value of the property when creating a node."); +DEFINE_int32(expected_status, 0, + "Expected query execution status when creating a node, 0 is success"); +DEFINE_int32(expected_result, 0, "Expected query result"); + +using namespace std::chrono_literals; + +int main(int argc, char **argv) { + gflags::ParseCommandLineFlags(&argc, &argv, true); + google::InitGoogleLogging(argv[0]); + + communication::Init(); + + try { + std::vector<io::network::Endpoint> endpoints; + for (int i = 0; i < FLAGS_cluster_size; ++i) { + uint16_t port = FLAGS_port + i; + io::network::Endpoint endpoint{FLAGS_address, port}; + endpoints.push_back(endpoint); + } + + std::chrono::milliseconds retry_delay(1000); + communication::ClientContext context(FLAGS_use_ssl); + communication::bolt::HAClient client(endpoints, &context, FLAGS_username, + FLAGS_password, FLAGS_num_retries, + retry_delay); + + if (FLAGS_step == "create") { + client.Execute("create constraint on (n:Node) assert n.prop is unique", + {}); + return 0; + } else if (FLAGS_step == "drop") { + client.Execute("drop constraint on (n:Node) assert n.prop is unique", {}); + return 0; + } else if (FLAGS_step == "add_node") { + client.Execute( + fmt::format("create (:Node{{prop:{}}})", FLAGS_property_value), {}); + + if (FLAGS_expected_status == 0) { + return 0; + } else { + LOG(WARNING) << "Query execution should've fail but it didn't."; + } + + } else if (FLAGS_step == "check") { + auto result = client.Execute("match (n) return n", {}); + + if (result.records.size() != FLAGS_expected_result) { + LOG(WARNING) << "Unexpected number of nodes: " + << "expected " << FLAGS_expected_result + << ", got " << result.records.size(); + return 2; + } + return 0; + + } else { + LOG(FATAL) << "Unexpected client step!"; + } + } catch (const communication::bolt::ClientQueryException &e) { + // Sometimes we expect the query to fail, so we need to handle this as + // success. + if (FLAGS_expected_status == 0) { + LOG(WARNING) << "There was some transient error during query execution."; + } else { + LOG(INFO) << "Query execution failed as expected, message: " << e.what(); + return 0; + } + } catch (const communication::bolt::ClientFatalException &) { + LOG(WARNING) << "Failed to communicate with the leader."; + } catch (const utils::BasicException &) { + LOG(WARNING) << "Error while executing query."; + } + + + return 1; +}