Add constraint in HA
Summary: HA should now support constraints in the same way the SM version does. I only tested this thing manually, but I plan to add a new integration test for this also. Reviewers: ipaljak, vkasljevic, mferencevic Reviewed By: ipaljak, mferencevic Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D2083
This commit is contained in:
parent
d7acf75a78
commit
87423eed06
src
tests/integration
@ -271,6 +271,8 @@ set(mg_single_node_ha_sources
|
||||
query/plan/variable_start_planner.cpp
|
||||
query/repl.cpp
|
||||
query/typed_value.cpp
|
||||
storage/common/constraints/record.cpp
|
||||
storage/common/constraints/unique_constraints.cpp
|
||||
storage/common/types/property_value.cpp
|
||||
storage/common/types/slk.cpp
|
||||
storage/common/types/property_value_store.cpp
|
||||
|
@ -198,24 +198,142 @@ void GraphDbAccessor::DeleteIndex(storage::Label label,
|
||||
}
|
||||
}
|
||||
|
||||
void GraphDbAccessor::UpdateLabelIndices(storage::Label label,
|
||||
const VertexAccessor &vertex_accessor,
|
||||
const Vertex *const vertex) {
|
||||
void GraphDbAccessor::BuildUniqueConstraint(
|
||||
storage::Label label, const std::vector<storage::Property> &properties) {
|
||||
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
|
||||
|
||||
storage::constraints::ConstraintEntry entry{label, properties};
|
||||
if (!db_->storage().unique_constraints_.AddConstraint(entry)) {
|
||||
// Already exists
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
auto dba = db_->AccessBlocking(std::make_optional(transaction().id_));
|
||||
|
||||
for (auto v : dba.Vertices(false)) {
|
||||
if (std::find(v.labels().begin(), v.labels().end(), label) !=
|
||||
v.labels().end()) {
|
||||
db_->storage().unique_constraints_.Update(v, dba.transaction());
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<std::string> property_names(properties.size());
|
||||
std::transform(properties.begin(), properties.end(), property_names.begin(),
|
||||
[&dba](storage::Property property) {
|
||||
return dba.PropertyName(property);
|
||||
});
|
||||
|
||||
dba.raft()->Emplace(database::StateDelta::BuildUniqueConstraint(
|
||||
dba.transaction().id_, label, dba.LabelName(label), properties,
|
||||
property_names));
|
||||
|
||||
dba.Commit();
|
||||
|
||||
} catch (const tx::TransactionEngineError &e) {
|
||||
db_->storage().unique_constraints_.RemoveConstraint(entry);
|
||||
throw TransactionException(e.what());
|
||||
} catch (const storage::constraints::ViolationException &e) {
|
||||
db_->storage().unique_constraints_.RemoveConstraint(entry);
|
||||
throw ConstraintViolationException(e.what());
|
||||
} catch (const storage::constraints::SerializationException &e) {
|
||||
db_->storage().unique_constraints_.RemoveConstraint(entry);
|
||||
throw mvcc::SerializationError();
|
||||
} catch (...) {
|
||||
db_->storage().unique_constraints_.RemoveConstraint(entry);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
void GraphDbAccessor::DeleteUniqueConstraint(
|
||||
storage::Label label, const std::vector<storage::Property> &properties) {
|
||||
storage::constraints::ConstraintEntry entry{label, properties};
|
||||
try {
|
||||
auto dba = db_->AccessBlocking(std::make_optional(transaction().id_));
|
||||
|
||||
if (!db_->storage().unique_constraints_.RemoveConstraint(entry)) {
|
||||
// Nothing was deleted
|
||||
return;
|
||||
}
|
||||
|
||||
std::vector<std::string> property_names(properties.size());
|
||||
std::transform(properties.begin(), properties.end(), property_names.begin(),
|
||||
[&dba](storage::Property property) {
|
||||
return dba.PropertyName(property);
|
||||
});
|
||||
|
||||
dba.raft()->Emplace(database::StateDelta::DropUniqueConstraint(
|
||||
dba.transaction().id_, label, dba.LabelName(label), properties,
|
||||
property_names));
|
||||
|
||||
dba.Commit();
|
||||
} catch (const tx::TransactionEngineError &e) {
|
||||
throw TransactionException(e.what());
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<storage::constraints::ConstraintEntry>
|
||||
GraphDbAccessor::ListUniqueConstraints() const {
|
||||
return db_->storage().unique_constraints_.ListConstraints();
|
||||
}
|
||||
|
||||
void GraphDbAccessor::UpdateOnAddLabel(storage::Label label,
|
||||
const VertexAccessor &vertex_accessor,
|
||||
const Vertex *vertex) {
|
||||
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
|
||||
auto *vlist_ptr = vertex_accessor.address();
|
||||
|
||||
try {
|
||||
db_->storage().unique_constraints_.UpdateOnAddLabel(label, vertex_accessor,
|
||||
transaction());
|
||||
} catch (const storage::constraints::SerializationException &e) {
|
||||
throw mvcc::SerializationError();
|
||||
} catch (const storage::constraints::ViolationException &e) {
|
||||
throw ConstraintViolationException(e.what());
|
||||
}
|
||||
|
||||
db_->storage().label_property_index_.UpdateOnLabel(label, vlist_ptr, vertex);
|
||||
db_->storage().labels_index_.Update(label, vlist_ptr, vertex);
|
||||
}
|
||||
|
||||
void GraphDbAccessor::UpdatePropertyIndex(
|
||||
storage::Property property, const RecordAccessor<Vertex> &vertex_accessor,
|
||||
const Vertex *const vertex) {
|
||||
void GraphDbAccessor::UpdateOnRemoveLabel(
|
||||
storage::Label label, const RecordAccessor<Vertex> &accessor) {
|
||||
db_->storage().unique_constraints_.UpdateOnRemoveLabel(label, accessor,
|
||||
transaction());
|
||||
}
|
||||
|
||||
void GraphDbAccessor::UpdateOnAddProperty(
|
||||
storage::Property property, const PropertyValue &previous_value,
|
||||
const PropertyValue &new_value,
|
||||
const RecordAccessor<Vertex> &vertex_accessor, const Vertex *vertex) {
|
||||
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
|
||||
|
||||
try {
|
||||
db_->storage().unique_constraints_.UpdateOnAddProperty(
|
||||
property, previous_value, new_value, vertex_accessor, transaction());
|
||||
} catch (const storage::constraints::SerializationException &e) {
|
||||
throw mvcc::SerializationError();
|
||||
} catch (const storage::constraints::ViolationException &e) {
|
||||
throw ConstraintViolationException(e.what());
|
||||
}
|
||||
|
||||
db_->storage().label_property_index_.UpdateOnProperty(
|
||||
property, vertex_accessor.address(), vertex);
|
||||
}
|
||||
|
||||
void GraphDbAccessor::UpdateOnRemoveProperty(
|
||||
storage::Property property, const PropertyValue &previous_value,
|
||||
const RecordAccessor<Vertex> &accessor, const Vertex *vertex) {
|
||||
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
|
||||
|
||||
try {
|
||||
db_->storage().unique_constraints_.UpdateOnRemoveProperty(
|
||||
property, previous_value, accessor, transaction());
|
||||
} catch (const storage::constraints::SerializationException &e) {
|
||||
throw mvcc::SerializationError();
|
||||
}
|
||||
}
|
||||
|
||||
int64_t GraphDbAccessor::VerticesCount() const {
|
||||
DCHECK(!commited_ && !aborted_) << "Accessor committed or aborted";
|
||||
return db_->storage().vertices_.access().size();
|
||||
|
@ -13,6 +13,7 @@
|
||||
|
||||
#include "database/single_node_ha/graph_db.hpp"
|
||||
#include "raft/raft_interface.hpp"
|
||||
#include "storage/common/constraints/exceptions.hpp"
|
||||
#include "storage/common/types/types.hpp"
|
||||
#include "storage/single_node_ha/edge_accessor.hpp"
|
||||
#include "storage/single_node_ha/vertex_accessor.hpp"
|
||||
@ -432,6 +433,32 @@ class GraphDbAccessor {
|
||||
/// Writes Index (key) creation to Raft, marks it as ready for usage
|
||||
void EnableIndex(const LabelPropertyIndex::Key &key);
|
||||
|
||||
/**
|
||||
* Creates new unique constraint that consists of a label and multiple
|
||||
* properties.
|
||||
* If the constraint already exists, this method does nothing.
|
||||
*
|
||||
* @throws ConstraintViolationException if constraint couldn't be build
|
||||
* due to existing constraint violation.
|
||||
* @throws TransactionEngineError if the engine doesn't accept transactions.
|
||||
* @throws mvcc::SerializationError on serialization errors.
|
||||
*/
|
||||
void BuildUniqueConstraint(storage::Label label,
|
||||
const std::vector<storage::Property> &properties);
|
||||
|
||||
/**
|
||||
* Deletes existing unique constraint.
|
||||
* If the constraint doesn't exist, this method does nothing.
|
||||
*/
|
||||
void DeleteUniqueConstraint(storage::Label label,
|
||||
const std::vector<storage::Property> &properties);
|
||||
|
||||
/**
|
||||
* Returns a list of currently active unique constraints.
|
||||
*/
|
||||
std::vector<storage::constraints::ConstraintEntry> ListUniqueConstraints()
|
||||
const;
|
||||
|
||||
/**
|
||||
* @brief - Returns true if the given label+property index already exists and
|
||||
* is ready for use.
|
||||
@ -608,15 +635,52 @@ class GraphDbAccessor {
|
||||
bool aborted_{false};
|
||||
|
||||
/**
|
||||
* Insert this vertex into corresponding any label + 'property' index.
|
||||
* @param property - vertex will be inserted into indexes which contain this
|
||||
* property
|
||||
* @param vertex_accessor - vertex accessor to insert
|
||||
* @param vertex - vertex to insert
|
||||
* Notifies storage about label addition.
|
||||
*
|
||||
* @param label - label that was added
|
||||
* @param vertex_accessor - vertex_accessor that was updated
|
||||
* @param vertex - vertex that was updated
|
||||
*/
|
||||
void UpdatePropertyIndex(storage::Property property,
|
||||
void UpdateOnAddLabel(storage::Label label,
|
||||
const VertexAccessor &vertex_accessor,
|
||||
const Vertex *vertex);
|
||||
|
||||
/**
|
||||
* Notifies storage about label removal.
|
||||
*
|
||||
* @param label - label that was removed
|
||||
* @param vertex_accessor - vertex_accessor that was updated
|
||||
*/
|
||||
void UpdateOnRemoveLabel(storage::Label label,
|
||||
const RecordAccessor<Vertex> &accessor);
|
||||
|
||||
/**
|
||||
* Notifies storage about a property removal.
|
||||
*
|
||||
* @param property - property that was removed
|
||||
* @param previous_value - previous value of the property
|
||||
* @param vertex_accessor - vertex_accessor that was updated
|
||||
* @param vertex - vertex that was updated
|
||||
*/
|
||||
void UpdateOnRemoveProperty(storage::Property property,
|
||||
const PropertyValue &previous_value,
|
||||
const RecordAccessor<Vertex> &accessor,
|
||||
const Vertex *vertex);
|
||||
|
||||
/**
|
||||
* Notifies storage about a property addition.
|
||||
*
|
||||
* @param property - property that was added
|
||||
* @param previous_value - previous value of the property
|
||||
* @param new_value - new value of the property
|
||||
* @param vertex_accessor - vertex accessor that was updated
|
||||
* @param vertex - vertex that was updated
|
||||
*/
|
||||
void UpdateOnAddProperty(storage::Property property,
|
||||
const PropertyValue &previous_value,
|
||||
const PropertyValue &new_value,
|
||||
const RecordAccessor<Vertex> &vertex_accessor,
|
||||
const Vertex *const vertex);
|
||||
const Vertex *vertex);
|
||||
};
|
||||
|
||||
} // namespace database
|
||||
|
@ -129,6 +129,32 @@ StateDelta StateDelta::NoOp(tx::TransactionId tx_id) {
|
||||
return op;
|
||||
}
|
||||
|
||||
StateDelta StateDelta::BuildUniqueConstraint(
|
||||
tx::TransactionId tx_id, storage::Label label,
|
||||
const std::string &label_name,
|
||||
const std::vector<storage::Property> &properties,
|
||||
const std::vector<std::string> &property_names) {
|
||||
StateDelta op(StateDelta::Type::BUILD_UNIQUE_CONSTRAINT, tx_id);
|
||||
op.label = label;
|
||||
op.label_name = label_name;
|
||||
op.properties = properties;
|
||||
op.property_names = property_names;
|
||||
return op;
|
||||
}
|
||||
|
||||
StateDelta StateDelta::DropUniqueConstraint(
|
||||
tx::TransactionId tx_id, storage::Label label,
|
||||
const std::string &label_name,
|
||||
const std::vector<storage::Property> &properties,
|
||||
const std::vector<std::string> &property_names) {
|
||||
StateDelta op(StateDelta::Type::DROP_UNIQUE_CONSTRAINT, tx_id);
|
||||
op.label = label;
|
||||
op.label_name = label_name;
|
||||
op.properties = properties;
|
||||
op.property_names = property_names;
|
||||
return op;
|
||||
}
|
||||
|
||||
void StateDelta::Encode(
|
||||
HashedFileWriter &writer,
|
||||
communication::bolt::BaseEncoder<HashedFileWriter> &encoder) const {
|
||||
@ -187,6 +213,28 @@ void StateDelta::Encode(
|
||||
encoder.WriteInt(property.Id());
|
||||
encoder.WriteString(property_name);
|
||||
break;
|
||||
case Type::BUILD_UNIQUE_CONSTRAINT:
|
||||
encoder.WriteInt(label.Id());
|
||||
encoder.WriteString(label_name);
|
||||
encoder.WriteInt(properties.size());
|
||||
for (auto prop : properties) {
|
||||
encoder.WriteInt(prop.Id());
|
||||
}
|
||||
for (auto &name : property_names) {
|
||||
encoder.WriteString(name);
|
||||
}
|
||||
break;
|
||||
case Type::DROP_UNIQUE_CONSTRAINT:
|
||||
encoder.WriteInt(label.Id());
|
||||
encoder.WriteString(label_name);
|
||||
encoder.WriteInt(properties.size());
|
||||
for (auto prop : properties) {
|
||||
encoder.WriteInt(prop.Id());
|
||||
}
|
||||
for (auto &name : property_names) {
|
||||
encoder.WriteString(name);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
writer.WriteValue(writer.hash());
|
||||
@ -268,6 +316,38 @@ std::optional<StateDelta> StateDelta::Decode(
|
||||
DECODE_MEMBER_CAST(property, ValueInt, storage::Property)
|
||||
DECODE_MEMBER(property_name, ValueString)
|
||||
break;
|
||||
case Type::BUILD_UNIQUE_CONSTRAINT: {
|
||||
DECODE_MEMBER_CAST(label, ValueInt, storage::Label)
|
||||
DECODE_MEMBER(label_name, ValueString)
|
||||
if (!decoder.ReadValue(&dv)) return nullopt;
|
||||
int size = dv.ValueInt();
|
||||
for (size_t i = 0; i < size; ++i) {
|
||||
if (!decoder.ReadValue(&dv)) return nullopt;
|
||||
r_val.properties.push_back(
|
||||
static_cast<storage::Property>(dv.ValueInt()));
|
||||
}
|
||||
for (size_t i = 0; i < size; ++i) {
|
||||
if (!decoder.ReadValue(&dv)) return nullopt;
|
||||
r_val.property_names.push_back(dv.ValueString());
|
||||
}
|
||||
break;
|
||||
}
|
||||
case Type::DROP_UNIQUE_CONSTRAINT: {
|
||||
DECODE_MEMBER_CAST(label, ValueInt, storage::Label)
|
||||
DECODE_MEMBER(label_name, ValueString)
|
||||
if (!decoder.ReadValue(&dv)) return nullopt;
|
||||
int size = dv.ValueInt();
|
||||
for (size_t i = 0; i < size; ++i) {
|
||||
if (!decoder.ReadValue(&dv)) return nullopt;
|
||||
r_val.properties.push_back(
|
||||
static_cast<storage::Property>(dv.ValueInt()));
|
||||
}
|
||||
for (size_t i = 0; i < size; ++i) {
|
||||
if (!decoder.ReadValue(&dv)) return nullopt;
|
||||
r_val.property_names.push_back(dv.ValueString());
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
auto decoder_hash = reader.hash();
|
||||
@ -342,6 +422,25 @@ void StateDelta::Apply(GraphDbAccessor &dba) const {
|
||||
}
|
||||
case Type::NO_OP:
|
||||
break;
|
||||
case Type::BUILD_UNIQUE_CONSTRAINT: {
|
||||
std::vector<storage::Property> properties;
|
||||
properties.reserve(property_names.size());
|
||||
for (auto &p : property_names) {
|
||||
properties.push_back(dba.Property(p));
|
||||
}
|
||||
|
||||
dba.BuildUniqueConstraint(dba.Label(label_name), properties);
|
||||
} break;
|
||||
case Type::DROP_UNIQUE_CONSTRAINT: {
|
||||
std::vector<storage::Property> properties;
|
||||
properties.reserve(property_names.size());
|
||||
for (auto &p : property_names) {
|
||||
properties.push_back(dba.Property(p));
|
||||
}
|
||||
|
||||
dba.DeleteUniqueConstraint(dba.Label(label_name), properties);
|
||||
} break;
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -35,6 +35,8 @@ cpp<#
|
||||
(edge-type-name "std::string")
|
||||
(property "storage::Property")
|
||||
(property-name "std::string")
|
||||
(properties "std::vector<storage::Property>")
|
||||
(property-names "std::vector<std::string>")
|
||||
(value "PropertyValue" :initval "PropertyValue::Null")
|
||||
(label "storage::Label")
|
||||
(label-name "std::string")
|
||||
@ -67,6 +69,8 @@ in StateDeltas.")
|
||||
build-index ;; label, label_name, property, property_name
|
||||
drop-index ;; label, label_name, property, property_name
|
||||
no-op ;; no-op state delta required by Raft protocol
|
||||
build-unique_constraint ;; label, label_name, properties, property_names
|
||||
drop-unique_constraint ;; label, label_name, properties, property_names
|
||||
)
|
||||
(:documentation
|
||||
"Defines StateDelta type. For each type the comment indicates which values
|
||||
@ -126,8 +130,17 @@ omitted in the comment.")
|
||||
const std::string &label_name,
|
||||
storage::Property property,
|
||||
const std::string &property_name);
|
||||
|
||||
static StateDelta NoOp(tx::TransactionId tx_id);
|
||||
static StateDelta BuildUniqueConstraint(
|
||||
tx::TransactionId tx_id, storage::Label label,
|
||||
const std::string &label_name,
|
||||
const std::vector<storage::Property> &properties,
|
||||
const std::vector<std::string> &property_names);
|
||||
static StateDelta DropUniqueConstraint(
|
||||
tx::TransactionId tx_id, storage::Label label,
|
||||
const std::string &label_name,
|
||||
const std::vector<storage::Property> &property,
|
||||
const std::vector<std::string> &property_names);
|
||||
|
||||
/// Applies CRUD delta to database accessor. Fails on other types of deltas
|
||||
void Apply(GraphDbAccessor &dba) const;
|
||||
|
@ -624,7 +624,7 @@ Callback HandleInfoQuery(InfoQuery *info_query,
|
||||
};
|
||||
break;
|
||||
case InfoQuery::InfoType::CONSTRAINT:
|
||||
#ifdef MG_SINGLE_NODE
|
||||
#if defined(MG_SINGLE_NODE) || defined(MG_SINGLE_NODE_HA)
|
||||
callback.header = {"constraint type", "label", "properties"};
|
||||
callback.fn = [db_accessor] {
|
||||
std::vector<std::vector<TypedValue>> results;
|
||||
@ -669,7 +669,7 @@ Callback HandleInfoQuery(InfoQuery *info_query,
|
||||
|
||||
Callback HandleConstraintQuery(ConstraintQuery *constraint_query,
|
||||
database::GraphDbAccessor *db_accessor) {
|
||||
#ifdef MG_SINGLE_NODE
|
||||
#if defined(MG_SINGLE_NODE) || defined(MG_SINGLE_NODE_HA)
|
||||
std::vector<storage::Property> properties;
|
||||
auto label = db_accessor->Label(constraint_query->constraint_.label.name);
|
||||
properties.reserve(constraint_query->constraint_.properties.size());
|
||||
|
@ -25,9 +25,10 @@ void RecordAccessor<Vertex>::PropsSet(storage::Property key,
|
||||
auto &dba = db_accessor();
|
||||
auto delta = StateDelta::PropsSetVertex(dba.transaction_id(), gid(), key,
|
||||
dba.PropertyName(key), value);
|
||||
auto previous_value = PropsAt(key);
|
||||
update().properties_.set(key, value);
|
||||
dba.UpdatePropertyIndex(key, *this, &update());
|
||||
db_accessor().raft()->Emplace(delta);
|
||||
dba.UpdateOnAddProperty(key, previous_value, value, *this, &update());
|
||||
dba.raft()->Emplace(delta);
|
||||
}
|
||||
|
||||
template <>
|
||||
@ -38,7 +39,7 @@ void RecordAccessor<Edge>::PropsSet(storage::Property key,
|
||||
dba.PropertyName(key), value);
|
||||
|
||||
update().properties_.set(key, value);
|
||||
db_accessor().raft()->Emplace(delta);
|
||||
dba.raft()->Emplace(delta);
|
||||
}
|
||||
|
||||
template <>
|
||||
@ -47,8 +48,10 @@ void RecordAccessor<Vertex>::PropsErase(storage::Property key) {
|
||||
auto delta =
|
||||
StateDelta::PropsSetVertex(dba.transaction_id(), gid(), key,
|
||||
dba.PropertyName(key), PropertyValue::Null);
|
||||
auto previous_value = PropsAt(key);
|
||||
update().properties_.set(key, PropertyValue::Null);
|
||||
db_accessor().raft()->Emplace(delta);
|
||||
dba.UpdateOnRemoveProperty(key, previous_value, *this, &update());
|
||||
dba.raft()->Emplace(delta);
|
||||
}
|
||||
|
||||
template <>
|
||||
@ -58,7 +61,7 @@ void RecordAccessor<Edge>::PropsErase(storage::Property key) {
|
||||
StateDelta::PropsSetEdge(dba.transaction_id(), gid(), key,
|
||||
dba.PropertyName(key), PropertyValue::Null);
|
||||
update().properties_.set(key, PropertyValue::Null);
|
||||
db_accessor().raft()->Emplace(delta);
|
||||
dba.raft()->Emplace(delta);
|
||||
}
|
||||
|
||||
template <typename TRecord>
|
||||
|
@ -149,6 +149,10 @@ class RecordAccessor {
|
||||
*/
|
||||
int64_t CypherId() const;
|
||||
|
||||
/** Returns the current version (either new_ or old_) set on this
|
||||
* RecordAccessor. */
|
||||
const TRecord ¤t() const;
|
||||
|
||||
protected:
|
||||
/**
|
||||
* Pointer to the version (either old_ or new_) that READ operations
|
||||
@ -160,10 +164,6 @@ class RecordAccessor {
|
||||
*/
|
||||
mutable TRecord *current_{nullptr};
|
||||
|
||||
/** Returns the current version (either new_ or old_) set on this
|
||||
* RecordAccessor. */
|
||||
const TRecord ¤t() const;
|
||||
|
||||
private:
|
||||
// The database accessor for which this record accessor is created
|
||||
// Provides means of getting to the transaction and database functions.
|
||||
|
@ -4,6 +4,7 @@
|
||||
#include <optional>
|
||||
|
||||
#include "data_structures/concurrent/concurrent_map.hpp"
|
||||
#include "storage/common/constraints/unique_constraints.hpp"
|
||||
#include "storage/common/kvstore/kvstore.hpp"
|
||||
#include "storage/common/types/types.hpp"
|
||||
#include "storage/single_node_ha/edge.hpp"
|
||||
@ -75,6 +76,9 @@ class Storage {
|
||||
KeyIndex<storage::Label, Vertex> labels_index_;
|
||||
LabelPropertyIndex label_property_index_;
|
||||
|
||||
// unique constraints
|
||||
storage::constraints::UniqueConstraints unique_constraints_;
|
||||
|
||||
std::vector<std::string> properties_on_disk_;
|
||||
|
||||
/// Gets the Vertex/Edge main storage map.
|
||||
|
@ -96,7 +96,7 @@ class StorageGc {
|
||||
|
||||
vertices_.gc_.Run(snapshot_gc, tx_engine_);
|
||||
edges_.gc_.Run(snapshot_gc, tx_engine_);
|
||||
|
||||
storage_.unique_constraints_.Refresh(snapshot_gc, tx_engine_);
|
||||
VLOG(21) << "Garbage collector mvcc phase time: " << x.Elapsed().count();
|
||||
}
|
||||
// This has to be run sequentially after gc because gc modifies
|
||||
|
@ -25,7 +25,7 @@ void VertexAccessor::add_label(storage::Label label) {
|
||||
if (!utils::Contains(vertex.labels_, label)) {
|
||||
vertex.labels_.emplace_back(label);
|
||||
dba.raft()->Emplace(delta);
|
||||
dba.UpdateLabelIndices(label, *this, &vertex);
|
||||
dba.UpdateOnAddLabel(label, *this, &vertex);
|
||||
}
|
||||
}
|
||||
|
||||
@ -40,6 +40,7 @@ void VertexAccessor::remove_label(storage::Label label) {
|
||||
std::swap(*found, labels.back());
|
||||
labels.pop_back();
|
||||
dba.raft()->Emplace(delta);
|
||||
dba.UpdateOnRemoveLabel(label, *this);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -19,6 +19,9 @@ add_subdirectory(distributed)
|
||||
# distributed ha/basic binaries
|
||||
add_subdirectory(ha/basic)
|
||||
|
||||
# distributed ha/constraints binaries
|
||||
add_subdirectory(ha/constraints)
|
||||
|
||||
# distributed ha/index binaries
|
||||
add_subdirectory(ha/index)
|
||||
|
||||
|
@ -82,6 +82,16 @@
|
||||
- ../../../../build_debug/memgraph_ha # memgraph ha binary
|
||||
- ../../../../build_debug/tests/integration/ha/basic/tester # tester binary
|
||||
|
||||
- name: integration__ha_constraints
|
||||
cd: ha/constraints
|
||||
commands: ./runner.py
|
||||
infiles:
|
||||
- runner.py # runner script
|
||||
- raft.json # raft configuration
|
||||
- ../ha_test.py # raft test base module
|
||||
- ../../../../build_debug/memgraph_ha # memgraph ha binary
|
||||
- ../../../../build_debug/tests/integration/ha/constraints/tester # tester binary
|
||||
|
||||
- name: integration__ha_index
|
||||
cd: ha/index
|
||||
commands: ./runner.py
|
||||
|
6
tests/integration/ha/constraints/CMakeLists.txt
Normal file
6
tests/integration/ha/constraints/CMakeLists.txt
Normal file
@ -0,0 +1,6 @@
|
||||
set(target_name memgraph__integration__ha_constraints)
|
||||
set(tester_target_name ${target_name}__tester)
|
||||
|
||||
add_executable(${tester_target_name} tester.cpp)
|
||||
set_target_properties(${tester_target_name} PROPERTIES OUTPUT_NAME tester)
|
||||
target_link_libraries(${tester_target_name} mg-utils mg-communication)
|
7
tests/integration/ha/constraints/raft.json
Normal file
7
tests/integration/ha/constraints/raft.json
Normal file
@ -0,0 +1,7 @@
|
||||
{
|
||||
"election_timeout_min": 200,
|
||||
"election_timeout_max": 500,
|
||||
"heartbeat_interval": 100,
|
||||
"replication_timeout": 10000,
|
||||
"log_size_snapshot_threshold": -1
|
||||
}
|
122
tests/integration/ha/constraints/runner.py
Executable file
122
tests/integration/ha/constraints/runner.py
Executable file
@ -0,0 +1,122 @@
|
||||
#!/usr/bin/python3
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import time
|
||||
import random
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
|
||||
PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", "..", ".."))
|
||||
|
||||
# append parent directory
|
||||
sys.path.append(os.path.join(SCRIPT_DIR, ".."))
|
||||
|
||||
from ha_test import HaTestBase
|
||||
|
||||
|
||||
class HaIndexTest(HaTestBase):
|
||||
def execute_step(self, step, property_value=None, expected_status=None,
|
||||
expected_result=None):
|
||||
if step == "create":
|
||||
print("Executing create query")
|
||||
client = subprocess.Popen([self.tester_binary, "--step", "create",
|
||||
"--cluster_size", str(self.cluster_size)])
|
||||
|
||||
elif step == "drop":
|
||||
print("Executing drop query")
|
||||
client = subprocess.Popen([self.tester_binary, "--step", "drop",
|
||||
"--cluster_size", str(self.cluster_size)])
|
||||
elif step == "add_node":
|
||||
print("Executing add_node query ")
|
||||
client = subprocess.Popen([self.tester_binary, "--step", "add_node",
|
||||
"--cluster_size", str(self.cluster_size), "--expected_status",
|
||||
str(expected_status), "--property_value", str(property_value)])
|
||||
|
||||
elif step == "check":
|
||||
print("Executing check query")
|
||||
client = subprocess.Popen([self.tester_binary, "--step", "check",
|
||||
"--cluster_size", str(self.cluster_size), "--expected_result",
|
||||
str(expected_result)])
|
||||
else:
|
||||
raise ValueError("Invalid step argument: " + step)
|
||||
|
||||
# Check what happened with query execution.
|
||||
try:
|
||||
code = client.wait(timeout=30)
|
||||
except subprocess.TimeoutExpired as e:
|
||||
print("HA client timed out!")
|
||||
client.kill()
|
||||
return 1
|
||||
|
||||
return code
|
||||
|
||||
|
||||
def execute(self):
|
||||
self.start_cluster()
|
||||
num_nodes = 1
|
||||
|
||||
assert self.execute_step("add_node", expected_status=0, \
|
||||
property_value=num_nodes) == 0, \
|
||||
"Error while executing add_node query"
|
||||
|
||||
assert self.execute_step("create") == 0, \
|
||||
"Error while executing create query"
|
||||
|
||||
assert self.execute_step("check", expected_result=num_nodes) == 0, \
|
||||
"Error while executing check query"
|
||||
|
||||
for i in range(self.cluster_size):
|
||||
# Kill worker.
|
||||
print("Killing worker {}".format(i))
|
||||
self.kill_worker(i)
|
||||
|
||||
assert self.execute_step("add_node", expected_status=1, \
|
||||
property_value=num_nodes) == 0, \
|
||||
"Error while executing add_node query"
|
||||
|
||||
assert self.execute_step("add_node", expected_status=0, \
|
||||
property_value=num_nodes + 1) == 0, \
|
||||
"Error while executing add_node query"
|
||||
|
||||
num_nodes += 1
|
||||
|
||||
# Bring worker back to life.
|
||||
print("Starting worker {}".format(i))
|
||||
self.start_worker(i)
|
||||
|
||||
assert self.execute_step("drop") == 0, \
|
||||
"Error while executing drop query"
|
||||
|
||||
assert self.execute_step("check", expected_result=num_nodes) == 0, \
|
||||
"Error while executing check query"
|
||||
|
||||
|
||||
def find_correct_path(path):
|
||||
f = os.path.join(PROJECT_DIR, "build", path)
|
||||
if not os.path.exists(f):
|
||||
f = os.path.join(PROJECT_DIR, "build_debug", path)
|
||||
return f
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
memgraph_binary = find_correct_path("memgraph_ha")
|
||||
tester_binary = find_correct_path(os.path.join("tests", "integration", "ha",
|
||||
"constraints", "tester"))
|
||||
|
||||
raft_config_file = os.path.join(PROJECT_DIR, "tests", "integration", "ha",
|
||||
"constraints", "raft.json")
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--memgraph", default=memgraph_binary)
|
||||
parser.add_argument("--raft_config_file", default=raft_config_file)
|
||||
args = parser.parse_args()
|
||||
|
||||
for cluster_size in [3, 5]:
|
||||
print("\033[1;36m~~ Executing test with cluster size: %d~~\033[0m" % (cluster_size))
|
||||
HaIndexTest(
|
||||
args.memgraph, tester_binary, args.raft_config_file, cluster_size)
|
||||
print("\033[1;32m~~ The test finished successfully ~~\033[0m")
|
||||
|
||||
sys.exit(0)
|
99
tests/integration/ha/constraints/tester.cpp
Normal file
99
tests/integration/ha/constraints/tester.cpp
Normal file
@ -0,0 +1,99 @@
|
||||
#include <chrono>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
#include <fmt/format.h>
|
||||
#include <gflags/gflags.h>
|
||||
#include <glog/logging.h>
|
||||
|
||||
#include "communication/bolt/ha_client.hpp"
|
||||
#include "io/network/endpoint.hpp"
|
||||
#include "io/network/utils.hpp"
|
||||
#include "utils/timer.hpp"
|
||||
|
||||
DEFINE_string(address, "127.0.0.1", "Server address");
|
||||
DEFINE_int32(port, 7687, "Server port");
|
||||
DEFINE_int32(cluster_size, 3, "Size of the raft cluster.");
|
||||
DEFINE_int32(num_retries, 20, "Number of (leader) execution retries.");
|
||||
DEFINE_string(username, "", "Username for the database");
|
||||
DEFINE_string(password, "", "Password for the database");
|
||||
DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server.");
|
||||
|
||||
DEFINE_string(step, "",
|
||||
"The step to execute (available: create, check, add_node, drop");
|
||||
DEFINE_int32(property_value, 0, "Value of the property when creating a node.");
|
||||
DEFINE_int32(expected_status, 0,
|
||||
"Expected query execution status when creating a node, 0 is success");
|
||||
DEFINE_int32(expected_result, 0, "Expected query result");
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
gflags::ParseCommandLineFlags(&argc, &argv, true);
|
||||
google::InitGoogleLogging(argv[0]);
|
||||
|
||||
communication::Init();
|
||||
|
||||
try {
|
||||
std::vector<io::network::Endpoint> endpoints;
|
||||
for (int i = 0; i < FLAGS_cluster_size; ++i) {
|
||||
uint16_t port = FLAGS_port + i;
|
||||
io::network::Endpoint endpoint{FLAGS_address, port};
|
||||
endpoints.push_back(endpoint);
|
||||
}
|
||||
|
||||
std::chrono::milliseconds retry_delay(1000);
|
||||
communication::ClientContext context(FLAGS_use_ssl);
|
||||
communication::bolt::HAClient client(endpoints, &context, FLAGS_username,
|
||||
FLAGS_password, FLAGS_num_retries,
|
||||
retry_delay);
|
||||
|
||||
if (FLAGS_step == "create") {
|
||||
client.Execute("create constraint on (n:Node) assert n.prop is unique",
|
||||
{});
|
||||
return 0;
|
||||
} else if (FLAGS_step == "drop") {
|
||||
client.Execute("drop constraint on (n:Node) assert n.prop is unique", {});
|
||||
return 0;
|
||||
} else if (FLAGS_step == "add_node") {
|
||||
client.Execute(
|
||||
fmt::format("create (:Node{{prop:{}}})", FLAGS_property_value), {});
|
||||
|
||||
if (FLAGS_expected_status == 0) {
|
||||
return 0;
|
||||
} else {
|
||||
LOG(WARNING) << "Query execution should've fail but it didn't.";
|
||||
}
|
||||
|
||||
} else if (FLAGS_step == "check") {
|
||||
auto result = client.Execute("match (n) return n", {});
|
||||
|
||||
if (result.records.size() != FLAGS_expected_result) {
|
||||
LOG(WARNING) << "Unexpected number of nodes: "
|
||||
<< "expected " << FLAGS_expected_result
|
||||
<< ", got " << result.records.size();
|
||||
return 2;
|
||||
}
|
||||
return 0;
|
||||
|
||||
} else {
|
||||
LOG(FATAL) << "Unexpected client step!";
|
||||
}
|
||||
} catch (const communication::bolt::ClientQueryException &e) {
|
||||
// Sometimes we expect the query to fail, so we need to handle this as
|
||||
// success.
|
||||
if (FLAGS_expected_status == 0) {
|
||||
LOG(WARNING) << "There was some transient error during query execution.";
|
||||
} else {
|
||||
LOG(INFO) << "Query execution failed as expected, message: " << e.what();
|
||||
return 0;
|
||||
}
|
||||
} catch (const communication::bolt::ClientFatalException &) {
|
||||
LOG(WARNING) << "Failed to communicate with the leader.";
|
||||
} catch (const utils::BasicException &) {
|
||||
LOG(WARNING) << "Error while executing query.";
|
||||
}
|
||||
|
||||
|
||||
return 1;
|
||||
}
|
Loading…
Reference in New Issue
Block a user