Improve performance on set properties (#1115)
This commit is contained in:
parent
1fe2190747
commit
509183e985
@ -24,6 +24,7 @@
|
||||
#include "query/typed_value.hpp"
|
||||
#include "storage/v2/id_types.hpp"
|
||||
#include "storage/v2/property_value.hpp"
|
||||
#include "storage/v2/result.hpp"
|
||||
#include "storage/v2/view.hpp"
|
||||
#include "utils/logging.hpp"
|
||||
|
||||
@ -75,6 +76,20 @@ inline void ExpectType(const Symbol &symbol, const TypedValue &value, TypedValue
|
||||
throw QueryRuntimeException("Expected a {} for '{}', but got {}.", expected, symbol.name(), value.type());
|
||||
}
|
||||
|
||||
inline void ProcessError(const storage::Error error) {
|
||||
switch (error) {
|
||||
case storage::Error::SERIALIZATION_ERROR:
|
||||
throw TransactionSerializationException();
|
||||
case storage::Error::DELETED_OBJECT:
|
||||
throw QueryRuntimeException("Trying to set properties on a deleted object.");
|
||||
case storage::Error::PROPERTIES_DISABLED:
|
||||
throw QueryRuntimeException("Can't set property because properties on edges are disabled.");
|
||||
case storage::Error::VERTEX_HAS_EDGES:
|
||||
case storage::Error::NONEXISTENT_OBJECT:
|
||||
throw QueryRuntimeException("Unexpected error when setting a property.");
|
||||
}
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
concept AccessorWithSetProperty = requires(T accessor, const storage::PropertyId key,
|
||||
const storage::PropertyValue new_value) {
|
||||
@ -89,17 +104,7 @@ storage::PropertyValue PropsSetChecked(T *record, const storage::PropertyId &key
|
||||
try {
|
||||
auto maybe_old_value = record->SetProperty(key, storage::PropertyValue(value));
|
||||
if (maybe_old_value.HasError()) {
|
||||
switch (maybe_old_value.GetError()) {
|
||||
case storage::Error::SERIALIZATION_ERROR:
|
||||
throw TransactionSerializationException();
|
||||
case storage::Error::DELETED_OBJECT:
|
||||
throw QueryRuntimeException("Trying to set properties on a deleted object.");
|
||||
case storage::Error::PROPERTIES_DISABLED:
|
||||
throw QueryRuntimeException("Can't set property because properties on edges are disabled.");
|
||||
case storage::Error::VERTEX_HAS_EDGES:
|
||||
case storage::Error::NONEXISTENT_OBJECT:
|
||||
throw QueryRuntimeException("Unexpected error when setting a property.");
|
||||
}
|
||||
ProcessError(maybe_old_value.GetError());
|
||||
}
|
||||
return std::move(*maybe_old_value);
|
||||
} catch (const TypedValueException &) {
|
||||
@ -121,17 +126,7 @@ bool MultiPropsInitChecked(T *record, std::map<storage::PropertyId, storage::Pro
|
||||
try {
|
||||
auto maybe_values = record->InitProperties(properties);
|
||||
if (maybe_values.HasError()) {
|
||||
switch (maybe_values.GetError()) {
|
||||
case storage::Error::SERIALIZATION_ERROR:
|
||||
throw TransactionSerializationException();
|
||||
case storage::Error::DELETED_OBJECT:
|
||||
throw QueryRuntimeException("Trying to set properties on a deleted object.");
|
||||
case storage::Error::PROPERTIES_DISABLED:
|
||||
throw QueryRuntimeException("Can't set property because properties on edges are disabled.");
|
||||
case storage::Error::VERTEX_HAS_EDGES:
|
||||
case storage::Error::NONEXISTENT_OBJECT:
|
||||
throw QueryRuntimeException("Unexpected error when setting a property.");
|
||||
}
|
||||
ProcessError(maybe_values.GetError());
|
||||
}
|
||||
return std::move(*maybe_values);
|
||||
} catch (const TypedValueException &) {
|
||||
@ -139,5 +134,31 @@ bool MultiPropsInitChecked(T *record, std::map<storage::PropertyId, storage::Pro
|
||||
}
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
concept AccessorWithUpdateProperties = requires(T accessor,
|
||||
std::map<storage::PropertyId, storage::PropertyValue> &properties) {
|
||||
{
|
||||
accessor.UpdateProperties(properties)
|
||||
} -> std::same_as<
|
||||
storage::Result<std::vector<std::tuple<storage::PropertyId, storage::PropertyValue, storage::PropertyValue>>>>;
|
||||
};
|
||||
|
||||
/// Set property `values` mapped with given `key` on a `record`.
|
||||
///
|
||||
/// @throw QueryRuntimeException if value cannot be set as a property value
|
||||
template <AccessorWithUpdateProperties T>
|
||||
auto UpdatePropertiesChecked(T *record, std::map<storage::PropertyId, storage::PropertyValue> &properties) ->
|
||||
typename std::remove_reference<decltype(record->UpdateProperties(properties).GetValue())>::type {
|
||||
try {
|
||||
auto maybe_values = record->UpdateProperties(properties);
|
||||
if (maybe_values.HasError()) {
|
||||
ProcessError(maybe_values.GetError());
|
||||
}
|
||||
return std::move(*maybe_values);
|
||||
} catch (const TypedValueException &) {
|
||||
throw QueryRuntimeException("Cannot update properties.");
|
||||
}
|
||||
}
|
||||
|
||||
int64_t QueryTimestamp();
|
||||
} // namespace memgraph::query
|
||||
|
@ -77,6 +77,11 @@ class EdgeAccessor final {
|
||||
return impl_.InitProperties(properties);
|
||||
}
|
||||
|
||||
storage::Result<std::vector<std::tuple<storage::PropertyId, storage::PropertyValue, storage::PropertyValue>>>
|
||||
UpdateProperties(std::map<storage::PropertyId, storage::PropertyValue> &properties) const {
|
||||
return impl_.UpdateProperties(properties);
|
||||
}
|
||||
|
||||
storage::Result<storage::PropertyValue> RemoveProperty(storage::PropertyId key) {
|
||||
return SetProperty(key, storage::PropertyValue());
|
||||
}
|
||||
@ -135,6 +140,11 @@ class VertexAccessor final {
|
||||
return impl_.InitProperties(properties);
|
||||
}
|
||||
|
||||
storage::Result<std::vector<std::tuple<storage::PropertyId, storage::PropertyValue, storage::PropertyValue>>>
|
||||
UpdateProperties(std::map<storage::PropertyId, storage::PropertyValue> &properties) const {
|
||||
return impl_.UpdateProperties(properties);
|
||||
}
|
||||
|
||||
storage::Result<storage::PropertyValue> RemoveProperty(storage::PropertyId key) {
|
||||
return SetProperty(key, storage::PropertyValue());
|
||||
}
|
||||
|
@ -2711,9 +2711,11 @@ namespace {
|
||||
|
||||
template <typename T>
|
||||
concept AccessorWithProperties = requires(T value, storage::PropertyId property_id,
|
||||
storage::PropertyValue property_value) {
|
||||
storage::PropertyValue property_value,
|
||||
std::map<storage::PropertyId, storage::PropertyValue> properties) {
|
||||
{ value.ClearProperties() } -> std::same_as<storage::Result<std::map<storage::PropertyId, storage::PropertyValue>>>;
|
||||
{value.SetProperty(property_id, property_value)};
|
||||
{value.UpdateProperties(properties)};
|
||||
};
|
||||
|
||||
/// Helper function that sets the given values on either a Vertex or an Edge.
|
||||
@ -2723,7 +2725,8 @@ concept AccessorWithProperties = requires(T value, storage::PropertyId property_
|
||||
template <AccessorWithProperties TRecordAccessor>
|
||||
void SetPropertiesOnRecord(TRecordAccessor *record, const TypedValue &rhs, SetProperties::Op op,
|
||||
ExecutionContext *context) {
|
||||
std::optional<std::map<storage::PropertyId, storage::PropertyValue>> old_values;
|
||||
using PropertiesMap = std::map<storage::PropertyId, storage::PropertyValue>;
|
||||
std::optional<PropertiesMap> old_values;
|
||||
const bool should_register_change =
|
||||
context->trigger_context_collector &&
|
||||
context->trigger_context_collector->ShouldRegisterObjectPropertyChange<TRecordAccessor>();
|
||||
@ -2782,44 +2785,34 @@ void SetPropertiesOnRecord(TRecordAccessor *record, const TypedValue &rhs, SetPr
|
||||
*record, key, TypedValue(std::move(old_value)), TypedValue(std::forward<decltype(new_value)>(new_value)));
|
||||
};
|
||||
|
||||
auto set_props = [&, record](auto properties) {
|
||||
for (auto &kv : properties) {
|
||||
auto maybe_error = record->SetProperty(kv.first, kv.second);
|
||||
if (maybe_error.HasError()) {
|
||||
switch (maybe_error.GetError()) {
|
||||
case storage::Error::DELETED_OBJECT:
|
||||
throw QueryRuntimeException("Trying to set properties on a deleted graph element.");
|
||||
case storage::Error::SERIALIZATION_ERROR:
|
||||
throw TransactionSerializationException();
|
||||
case storage::Error::PROPERTIES_DISABLED:
|
||||
throw QueryRuntimeException("Can't set property because properties on edges are disabled.");
|
||||
case storage::Error::VERTEX_HAS_EDGES:
|
||||
case storage::Error::NONEXISTENT_OBJECT:
|
||||
throw QueryRuntimeException("Unexpected error when setting properties.");
|
||||
}
|
||||
}
|
||||
auto update_props = [&, record](PropertiesMap &new_properties) {
|
||||
auto updated_properties = UpdatePropertiesChecked(record, new_properties);
|
||||
|
||||
if (should_register_change) {
|
||||
register_set_property(std::move(*maybe_error), kv.first, std::move(kv.second));
|
||||
if (should_register_change) {
|
||||
for (const auto &[id, old_value, new_value] : updated_properties) {
|
||||
register_set_property(std::move(old_value), id, std::move(new_value));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
switch (rhs.type()) {
|
||||
case TypedValue::Type::Edge:
|
||||
set_props(get_props(rhs.ValueEdge()));
|
||||
case TypedValue::Type::Edge: {
|
||||
PropertiesMap new_properties = get_props(rhs.ValueEdge());
|
||||
update_props(new_properties);
|
||||
break;
|
||||
case TypedValue::Type::Vertex:
|
||||
set_props(get_props(rhs.ValueVertex()));
|
||||
}
|
||||
case TypedValue::Type::Vertex: {
|
||||
PropertiesMap new_properties = get_props(rhs.ValueVertex());
|
||||
update_props(new_properties);
|
||||
break;
|
||||
}
|
||||
case TypedValue::Type::Map: {
|
||||
for (const auto &kv : rhs.ValueMap()) {
|
||||
auto key = context->db_accessor->NameToProperty(kv.first);
|
||||
auto old_value = PropsSetChecked(record, key, kv.second);
|
||||
if (should_register_change) {
|
||||
register_set_property(std::move(old_value), key, kv.second);
|
||||
}
|
||||
PropertiesMap new_properties;
|
||||
for (const auto &[prop_id, prop_value] : rhs.ValueMap()) {
|
||||
auto key = context->db_accessor->NameToProperty(prop_id);
|
||||
new_properties.emplace(key, prop_value);
|
||||
}
|
||||
update_props(new_properties);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
@ -2860,7 +2853,6 @@ bool SetProperties::SetPropertiesCursor::Pull(Frame &frame, ExecutionContext &co
|
||||
throw QueryRuntimeException("Vertex properties not set due to not having enough permission!");
|
||||
}
|
||||
#endif
|
||||
|
||||
SetPropertiesOnRecord(&lhs.ValueVertex(), rhs, self_.op_, &context);
|
||||
break;
|
||||
case TypedValue::Type::Edge:
|
||||
|
@ -174,6 +174,10 @@ struct Delta {
|
||||
uint64_t command_id)
|
||||
: action(Action::SET_PROPERTY), timestamp(timestamp), command_id(command_id), property({key, value}) {}
|
||||
|
||||
Delta(SetPropertyTag /*tag*/, PropertyId key, PropertyValue &&value, std::atomic<uint64_t> *timestamp,
|
||||
uint64_t command_id)
|
||||
: action(Action::SET_PROPERTY), timestamp(timestamp), command_id(command_id), property({key, std::move(value)}) {}
|
||||
|
||||
Delta(AddInEdgeTag /*tag*/, EdgeTypeId edge_type, Vertex *vertex, EdgeRef edge, std::atomic<uint64_t> *timestamp,
|
||||
uint64_t command_id)
|
||||
: action(Action::ADD_IN_EDGE),
|
||||
|
@ -12,11 +12,13 @@
|
||||
#include "storage/v2/edge_accessor.hpp"
|
||||
|
||||
#include <memory>
|
||||
#include <stdexcept>
|
||||
#include <tuple>
|
||||
|
||||
#include "storage/v2/delta.hpp"
|
||||
#include "storage/v2/mvcc.hpp"
|
||||
#include "storage/v2/property_value.hpp"
|
||||
#include "storage/v2/result.hpp"
|
||||
#include "storage/v2/vertex_accessor.hpp"
|
||||
#include "utils/memory_tracker.hpp"
|
||||
|
||||
@ -145,6 +147,26 @@ Result<bool> EdgeAccessor::InitProperties(const std::map<storage::PropertyId, st
|
||||
return true;
|
||||
}
|
||||
|
||||
Result<std::vector<std::tuple<PropertyId, PropertyValue, PropertyValue>>> EdgeAccessor::UpdateProperties(
|
||||
std::map<storage::PropertyId, storage::PropertyValue> &properties) const {
|
||||
utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception;
|
||||
if (!config_.properties_on_edges) return Error::PROPERTIES_DISABLED;
|
||||
|
||||
std::lock_guard<utils::SpinLock> guard(edge_.ptr->lock);
|
||||
|
||||
if (!PrepareForWrite(transaction_, edge_.ptr)) return Error::SERIALIZATION_ERROR;
|
||||
|
||||
if (edge_.ptr->deleted) return Error::DELETED_OBJECT;
|
||||
|
||||
auto id_old_new_change = edge_.ptr->properties.UpdateProperties(properties);
|
||||
|
||||
for (auto &[property, old_value, new_value] : id_old_new_change) {
|
||||
CreateAndLinkDelta(transaction_, edge_.ptr, Delta::SetPropertyTag(), property, std::move(old_value));
|
||||
}
|
||||
|
||||
return id_old_new_change;
|
||||
}
|
||||
|
||||
Result<std::map<PropertyId, PropertyValue>> EdgeAccessor::ClearProperties() {
|
||||
if (!config_.properties_on_edges) return Error::PROPERTIES_DISABLED;
|
||||
|
||||
|
@ -63,6 +63,9 @@ class EdgeAccessor final {
|
||||
/// @throw std::bad_alloc
|
||||
Result<bool> InitProperties(const std::map<storage::PropertyId, storage::PropertyValue> &properties);
|
||||
|
||||
Result<std::vector<std::tuple<PropertyId, PropertyValue, PropertyValue>>> UpdateProperties(
|
||||
std::map<storage::PropertyId, storage::PropertyValue> &properties) const;
|
||||
|
||||
/// Remove all properties and return old values for each removed property.
|
||||
/// @throw std::bad_alloc
|
||||
Result<std::map<PropertyId, PropertyValue>> ClearProperties();
|
||||
|
@ -1238,6 +1238,32 @@ bool PropertyStore::DoInitProperties(const TContainer &properties) {
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
std::vector<std::tuple<PropertyId, PropertyValue, PropertyValue>> PropertyStore::UpdateProperties(
|
||||
std::map<PropertyId, PropertyValue> &properties) {
|
||||
auto old_properties = Properties();
|
||||
ClearProperties();
|
||||
|
||||
std::vector<std::tuple<PropertyId, PropertyValue, PropertyValue>> id_old_new_change;
|
||||
id_old_new_change.reserve(properties.size() + old_properties.size());
|
||||
for (const auto &[prop_id, new_value] : properties) {
|
||||
if (!old_properties.contains(prop_id)) {
|
||||
id_old_new_change.emplace_back(std::make_tuple(prop_id, PropertyValue(), new_value));
|
||||
}
|
||||
}
|
||||
|
||||
for (const auto &[old_key, old_value] : old_properties) {
|
||||
auto [it, inserted] = properties.emplace(old_key, old_value);
|
||||
if (!inserted) {
|
||||
auto &new_value = it->second;
|
||||
id_old_new_change.emplace_back(std::make_tuple(it->first, old_value, new_value));
|
||||
}
|
||||
}
|
||||
|
||||
MG_ASSERT(InitProperties(properties));
|
||||
return id_old_new_change;
|
||||
}
|
||||
|
||||
template bool PropertyStore::DoInitProperties<std::map<PropertyId, PropertyValue>>(
|
||||
const std::map<PropertyId, PropertyValue> &);
|
||||
template bool PropertyStore::DoInitProperties<std::vector<std::pair<PropertyId, PropertyValue>>>(
|
||||
|
@ -91,6 +91,14 @@ class PropertyStore {
|
||||
/// @throw std::bad_alloc
|
||||
bool InitProperties(std::vector<std::pair<storage::PropertyId, storage::PropertyValue>> properties);
|
||||
|
||||
/// Update property values in property store with sent properties. Returns vector of changed
|
||||
/// properties. Each tuple inside vector consists of PropertyId of inserted property, together with old
|
||||
/// property (if existed or empty PropertyValue if didn't exist) and new property which was inserted.
|
||||
/// The time complexity of this function is O(n*log(n)):
|
||||
/// @throw std::bad_alloc
|
||||
std::vector<std::tuple<PropertyId, PropertyValue, PropertyValue>> UpdateProperties(
|
||||
std::map<storage::PropertyId, storage::PropertyValue> &properties);
|
||||
|
||||
/// Remove all properties and return `true` if any removal took place.
|
||||
/// `false` is returned if there were no properties to remove. The time
|
||||
/// complexity of this function is O(1).
|
||||
|
@ -12,12 +12,15 @@
|
||||
#include "storage/v2/vertex_accessor.hpp"
|
||||
|
||||
#include <memory>
|
||||
#include <tuple>
|
||||
#include <utility>
|
||||
|
||||
#include "storage/v2/edge_accessor.hpp"
|
||||
#include "storage/v2/id_types.hpp"
|
||||
#include "storage/v2/indices/indices.hpp"
|
||||
#include "storage/v2/mvcc.hpp"
|
||||
#include "storage/v2/property_value.hpp"
|
||||
#include "storage/v2/result.hpp"
|
||||
#include "utils/logging.hpp"
|
||||
#include "utils/memory_tracker.hpp"
|
||||
|
||||
@ -254,6 +257,25 @@ Result<bool> VertexAccessor::InitProperties(const std::map<storage::PropertyId,
|
||||
return true;
|
||||
}
|
||||
|
||||
Result<std::vector<std::tuple<PropertyId, PropertyValue, PropertyValue>>> VertexAccessor::UpdateProperties(
|
||||
std::map<storage::PropertyId, storage::PropertyValue> &properties) const {
|
||||
utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception;
|
||||
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
|
||||
|
||||
if (!PrepareForWrite(transaction_, vertex_)) return Error::SERIALIZATION_ERROR;
|
||||
|
||||
if (vertex_->deleted) return Error::DELETED_OBJECT;
|
||||
|
||||
auto id_old_new_change = vertex_->properties.UpdateProperties(properties);
|
||||
|
||||
for (auto &[id, old_value, new_value] : id_old_new_change) {
|
||||
indices_->UpdateOnSetProperty(id, new_value, vertex_, *transaction_);
|
||||
CreateAndLinkDelta(transaction_, vertex_, Delta::SetPropertyTag(), id, std::move(old_value));
|
||||
}
|
||||
|
||||
return id_old_new_change;
|
||||
}
|
||||
|
||||
Result<std::map<PropertyId, PropertyValue>> VertexAccessor::ClearProperties() {
|
||||
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
|
||||
|
||||
|
@ -73,6 +73,9 @@ class VertexAccessor final {
|
||||
/// @throw std::bad_alloc
|
||||
Result<bool> InitProperties(const std::map<storage::PropertyId, storage::PropertyValue> &properties);
|
||||
|
||||
Result<std::vector<std::tuple<PropertyId, PropertyValue, PropertyValue>>> UpdateProperties(
|
||||
std::map<storage::PropertyId, storage::PropertyValue> &properties) const;
|
||||
|
||||
/// Remove all properties and return the values of the removed properties.
|
||||
/// @throw std::bad_alloc
|
||||
Result<std::map<PropertyId, PropertyValue>> ClearProperties();
|
||||
|
@ -1678,6 +1678,41 @@ TYPED_TEST(QueryPlanTest, SetPropertiesOnNull) {
|
||||
EXPECT_EQ(1, PullAll(*set_op, &context));
|
||||
}
|
||||
|
||||
TYPED_TEST(QueryPlanTest, UpdateSetPropertiesFromMap) {
|
||||
auto storage_dba = this->db->Access();
|
||||
memgraph::query::DbAccessor dba(storage_dba.get());
|
||||
// Add a single vertex. ( {property: 43})
|
||||
auto vertex_accessor = dba.InsertVertex();
|
||||
auto old_value = vertex_accessor.SetProperty(dba.NameToProperty("property"), memgraph::storage::PropertyValue{43});
|
||||
EXPECT_EQ(old_value.HasError(), false);
|
||||
EXPECT_EQ(*old_value, memgraph::storage::PropertyValue());
|
||||
dba.AdvanceCommand();
|
||||
EXPECT_EQ(1, CountIterable(dba.Vertices(memgraph::storage::View::OLD)));
|
||||
SymbolTable symbol_table;
|
||||
// MATCH (n) SET n += {property: "updated", new_property:"a"}
|
||||
auto n = MakeScanAll(this->storage, symbol_table, "n");
|
||||
|
||||
auto prop_property = PROPERTY_PAIR(dba, "property");
|
||||
auto prop_new_property = PROPERTY_PAIR(dba, "new_property");
|
||||
|
||||
std::unordered_map<PropertyIx, Expression *> prop_map;
|
||||
prop_map.emplace(this->storage.GetPropertyIx(prop_property.first), LITERAL("updated"));
|
||||
prop_map.emplace(this->storage.GetPropertyIx(prop_new_property.first), LITERAL("a"));
|
||||
auto *rhs = this->storage.template Create<MapLiteral>(prop_map);
|
||||
|
||||
auto op_type{plan::SetProperties::Op::UPDATE};
|
||||
auto set_op = std::make_shared<plan::SetProperties>(n.op_, n.sym_, rhs, op_type);
|
||||
auto context = MakeContext(this->storage, symbol_table, &dba);
|
||||
PullAll(*set_op, &context);
|
||||
dba.AdvanceCommand();
|
||||
auto new_properties = vertex_accessor.Properties(memgraph::storage::View::OLD);
|
||||
std::map<memgraph::storage::PropertyId, memgraph::storage::PropertyValue> expected_properties;
|
||||
expected_properties.emplace(dba.NameToProperty("property"), memgraph::storage::PropertyValue("updated"));
|
||||
expected_properties.emplace(dba.NameToProperty("new_property"), memgraph::storage::PropertyValue("a"));
|
||||
EXPECT_EQ(new_properties.HasError(), false);
|
||||
EXPECT_EQ(*new_properties, expected_properties);
|
||||
}
|
||||
|
||||
TYPED_TEST(QueryPlanTest, SetLabelsOnNull) {
|
||||
// OPTIONAL MATCH (n) SET n :label
|
||||
auto storage_dba = this->db->Access();
|
||||
|
Loading…
Reference in New Issue
Block a user