Improve memory handling of Deltas (#1688)
- Reduce delta from 104B to 80B - Hold and pass them around as in a deque - Detect and deleted deltas within commit if safe to do so
This commit is contained in:
parent
7ead00f23e
commit
4ef6a1f9c3
@ -270,6 +270,8 @@ pushd jemalloc
|
|||||||
MALLOC_CONF="retain:false,percpu_arena:percpu,oversize_threshold:0,muzzy_decay_ms:5000,dirty_decay_ms:5000" \
|
MALLOC_CONF="retain:false,percpu_arena:percpu,oversize_threshold:0,muzzy_decay_ms:5000,dirty_decay_ms:5000" \
|
||||||
./configure \
|
./configure \
|
||||||
--disable-cxx \
|
--disable-cxx \
|
||||||
|
--with-lg-page=12 \
|
||||||
|
--with-lg-hugepage=21 \
|
||||||
--enable-shared=no --prefix=$working_dir \
|
--enable-shared=no --prefix=$working_dir \
|
||||||
--with-malloc-conf="retain:false,percpu_arena:percpu,oversize_threshold:0,muzzy_decay_ms:5000,dirty_decay_ms:5000"
|
--with-malloc-conf="retain:false,percpu_arena:percpu,oversize_threshold:0,muzzy_decay_ms:5000,dirty_decay_ms:5000"
|
||||||
|
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
// Copyright 2023 Memgraph Ltd.
|
// Copyright 2024 Memgraph Ltd.
|
||||||
//
|
//
|
||||||
// Use of this software is governed by the Business Source License
|
// Use of this software is governed by the Business Source License
|
||||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||||
@ -57,9 +57,11 @@ class PreviousPtr {
|
|||||||
explicit Pointer(Edge *edge) : type(Type::EDGE), edge(edge) {}
|
explicit Pointer(Edge *edge) : type(Type::EDGE), edge(edge) {}
|
||||||
|
|
||||||
Type type{Type::NULLPTR};
|
Type type{Type::NULLPTR};
|
||||||
Delta *delta{nullptr};
|
union {
|
||||||
Vertex *vertex{nullptr};
|
Delta *delta = nullptr;
|
||||||
Edge *edge{nullptr};
|
Vertex *vertex;
|
||||||
|
Edge *edge;
|
||||||
|
};
|
||||||
};
|
};
|
||||||
|
|
||||||
PreviousPtr() : storage_(0) {}
|
PreviousPtr() : storage_(0) {}
|
||||||
@ -157,59 +159,51 @@ struct Delta {
|
|||||||
// DELETE_DESERIALIZED_OBJECT is used to load data from disk committed by past txs.
|
// DELETE_DESERIALIZED_OBJECT is used to load data from disk committed by past txs.
|
||||||
// Because of this object was created in past txs, we create timestamp by ourselves inside instead of having it from
|
// Because of this object was created in past txs, we create timestamp by ourselves inside instead of having it from
|
||||||
// current tx. This timestamp we got from RocksDB timestamp stored in key.
|
// current tx. This timestamp we got from RocksDB timestamp stored in key.
|
||||||
Delta(DeleteDeserializedObjectTag /*tag*/, uint64_t ts, const std::optional<std::string> &old_disk_key)
|
Delta(DeleteDeserializedObjectTag /*tag*/, uint64_t ts, std::optional<std::string> old_disk_key)
|
||||||
: action(Action::DELETE_DESERIALIZED_OBJECT),
|
: timestamp(new std::atomic<uint64_t>(ts)), command_id(0), old_disk_key{.value = std::move(old_disk_key)} {}
|
||||||
timestamp(new std::atomic<uint64_t>(ts)),
|
|
||||||
command_id(0),
|
|
||||||
old_disk_key(old_disk_key) {}
|
|
||||||
|
|
||||||
Delta(DeleteObjectTag /*tag*/, std::atomic<uint64_t> *timestamp, uint64_t command_id)
|
Delta(DeleteObjectTag /*tag*/, std::atomic<uint64_t> *timestamp, uint64_t command_id)
|
||||||
: action(Action::DELETE_OBJECT), timestamp(timestamp), command_id(command_id) {}
|
: timestamp(timestamp), command_id(command_id), action(Action::DELETE_OBJECT) {}
|
||||||
|
|
||||||
Delta(RecreateObjectTag /*tag*/, std::atomic<uint64_t> *timestamp, uint64_t command_id)
|
Delta(RecreateObjectTag /*tag*/, std::atomic<uint64_t> *timestamp, uint64_t command_id)
|
||||||
: action(Action::RECREATE_OBJECT), timestamp(timestamp), command_id(command_id) {}
|
: timestamp(timestamp), command_id(command_id), action(Action::RECREATE_OBJECT) {}
|
||||||
|
|
||||||
Delta(AddLabelTag /*tag*/, LabelId label, std::atomic<uint64_t> *timestamp, uint64_t command_id)
|
Delta(AddLabelTag /*tag*/, LabelId label, std::atomic<uint64_t> *timestamp, uint64_t command_id)
|
||||||
: action(Action::ADD_LABEL), timestamp(timestamp), command_id(command_id), label(label) {}
|
: timestamp(timestamp), command_id(command_id), label{.action = Action::ADD_LABEL, .value = label} {}
|
||||||
|
|
||||||
Delta(RemoveLabelTag /*tag*/, LabelId label, std::atomic<uint64_t> *timestamp, uint64_t command_id)
|
Delta(RemoveLabelTag /*tag*/, LabelId label, std::atomic<uint64_t> *timestamp, uint64_t command_id)
|
||||||
: action(Action::REMOVE_LABEL), timestamp(timestamp), command_id(command_id), label(label) {}
|
: timestamp(timestamp), command_id(command_id), label{.action = Action::REMOVE_LABEL, .value = label} {}
|
||||||
|
|
||||||
Delta(SetPropertyTag /*tag*/, PropertyId key, const PropertyValue &value, std::atomic<uint64_t> *timestamp,
|
Delta(SetPropertyTag /*tag*/, PropertyId key, PropertyValue value, std::atomic<uint64_t> *timestamp,
|
||||||
uint64_t command_id)
|
uint64_t command_id)
|
||||||
: action(Action::SET_PROPERTY), timestamp(timestamp), command_id(command_id), property({key, value}) {}
|
: timestamp(timestamp),
|
||||||
|
command_id(command_id),
|
||||||
Delta(SetPropertyTag /*tag*/, PropertyId key, PropertyValue &&value, std::atomic<uint64_t> *timestamp,
|
property{
|
||||||
uint64_t command_id)
|
.action = Action::SET_PROPERTY, .key = key, .value = std::make_unique<PropertyValue>(std::move(value))} {}
|
||||||
: action(Action::SET_PROPERTY), timestamp(timestamp), command_id(command_id), property({key, std::move(value)}) {}
|
|
||||||
|
|
||||||
Delta(AddInEdgeTag /*tag*/, EdgeTypeId edge_type, Vertex *vertex, EdgeRef edge, std::atomic<uint64_t> *timestamp,
|
Delta(AddInEdgeTag /*tag*/, EdgeTypeId edge_type, Vertex *vertex, EdgeRef edge, std::atomic<uint64_t> *timestamp,
|
||||||
uint64_t command_id)
|
uint64_t command_id)
|
||||||
: action(Action::ADD_IN_EDGE),
|
: timestamp(timestamp),
|
||||||
timestamp(timestamp),
|
|
||||||
command_id(command_id),
|
command_id(command_id),
|
||||||
vertex_edge({edge_type, vertex, edge}) {}
|
vertex_edge{.action = Action::ADD_IN_EDGE, .edge_type = edge_type, vertex, edge} {}
|
||||||
|
|
||||||
Delta(AddOutEdgeTag /*tag*/, EdgeTypeId edge_type, Vertex *vertex, EdgeRef edge, std::atomic<uint64_t> *timestamp,
|
Delta(AddOutEdgeTag /*tag*/, EdgeTypeId edge_type, Vertex *vertex, EdgeRef edge, std::atomic<uint64_t> *timestamp,
|
||||||
uint64_t command_id)
|
uint64_t command_id)
|
||||||
: action(Action::ADD_OUT_EDGE),
|
: timestamp(timestamp),
|
||||||
timestamp(timestamp),
|
|
||||||
command_id(command_id),
|
command_id(command_id),
|
||||||
vertex_edge({edge_type, vertex, edge}) {}
|
vertex_edge{.action = Action::ADD_OUT_EDGE, .edge_type = edge_type, vertex, edge} {}
|
||||||
|
|
||||||
Delta(RemoveInEdgeTag /*tag*/, EdgeTypeId edge_type, Vertex *vertex, EdgeRef edge, std::atomic<uint64_t> *timestamp,
|
Delta(RemoveInEdgeTag /*tag*/, EdgeTypeId edge_type, Vertex *vertex, EdgeRef edge, std::atomic<uint64_t> *timestamp,
|
||||||
uint64_t command_id)
|
uint64_t command_id)
|
||||||
: action(Action::REMOVE_IN_EDGE),
|
: timestamp(timestamp),
|
||||||
timestamp(timestamp),
|
|
||||||
command_id(command_id),
|
command_id(command_id),
|
||||||
vertex_edge({edge_type, vertex, edge}) {}
|
vertex_edge{.action = Action::REMOVE_IN_EDGE, .edge_type = edge_type, vertex, edge} {}
|
||||||
|
|
||||||
Delta(RemoveOutEdgeTag /*tag*/, EdgeTypeId edge_type, Vertex *vertex, EdgeRef edge, std::atomic<uint64_t> *timestamp,
|
Delta(RemoveOutEdgeTag /*tag*/, EdgeTypeId edge_type, Vertex *vertex, EdgeRef edge, std::atomic<uint64_t> *timestamp,
|
||||||
uint64_t command_id)
|
uint64_t command_id)
|
||||||
: action(Action::REMOVE_OUT_EDGE),
|
: timestamp(timestamp),
|
||||||
timestamp(timestamp),
|
|
||||||
command_id(command_id),
|
command_id(command_id),
|
||||||
vertex_edge({edge_type, vertex, edge}) {}
|
vertex_edge{.action = Action::REMOVE_OUT_EDGE, .edge_type = edge_type, vertex, edge} {}
|
||||||
|
|
||||||
Delta(const Delta &) = delete;
|
Delta(const Delta &) = delete;
|
||||||
Delta(Delta &&) = delete;
|
Delta(Delta &&) = delete;
|
||||||
@ -228,18 +222,16 @@ struct Delta {
|
|||||||
case Action::REMOVE_OUT_EDGE:
|
case Action::REMOVE_OUT_EDGE:
|
||||||
break;
|
break;
|
||||||
case Action::DELETE_DESERIALIZED_OBJECT:
|
case Action::DELETE_DESERIALIZED_OBJECT:
|
||||||
old_disk_key.reset();
|
old_disk_key.value.reset();
|
||||||
delete timestamp;
|
delete timestamp;
|
||||||
timestamp = nullptr;
|
timestamp = nullptr;
|
||||||
break;
|
break;
|
||||||
case Action::SET_PROPERTY:
|
case Action::SET_PROPERTY:
|
||||||
property.value.~PropertyValue();
|
property.value.reset();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Action action;
|
|
||||||
|
|
||||||
// TODO: optimize with in-place copy
|
// TODO: optimize with in-place copy
|
||||||
std::atomic<uint64_t> *timestamp;
|
std::atomic<uint64_t> *timestamp;
|
||||||
uint64_t command_id;
|
uint64_t command_id;
|
||||||
@ -247,13 +239,22 @@ struct Delta {
|
|||||||
std::atomic<Delta *> next{nullptr};
|
std::atomic<Delta *> next{nullptr};
|
||||||
|
|
||||||
union {
|
union {
|
||||||
std::optional<std::string> old_disk_key;
|
Action action;
|
||||||
LabelId label;
|
|
||||||
struct {
|
struct {
|
||||||
|
Action action = Action::DELETE_DESERIALIZED_OBJECT;
|
||||||
|
std::optional<std::string> value;
|
||||||
|
} old_disk_key;
|
||||||
|
struct {
|
||||||
|
Action action;
|
||||||
|
LabelId value;
|
||||||
|
} label;
|
||||||
|
struct {
|
||||||
|
Action action;
|
||||||
PropertyId key;
|
PropertyId key;
|
||||||
storage::PropertyValue value;
|
std::unique_ptr<storage::PropertyValue> value;
|
||||||
} property;
|
} property;
|
||||||
struct {
|
struct {
|
||||||
|
Action action;
|
||||||
EdgeTypeId edge_type;
|
EdgeTypeId edge_type;
|
||||||
Vertex *vertex;
|
Vertex *vertex;
|
||||||
EdgeRef edge;
|
EdgeRef edge;
|
||||||
|
@ -137,14 +137,14 @@ bool VertexHasLabel(const Vertex &vertex, LabelId label, Transaction *transactio
|
|||||||
ApplyDeltasForRead(transaction, delta, view, [&deleted, &has_label, label](const Delta &delta) {
|
ApplyDeltasForRead(transaction, delta, view, [&deleted, &has_label, label](const Delta &delta) {
|
||||||
switch (delta.action) {
|
switch (delta.action) {
|
||||||
case Delta::Action::REMOVE_LABEL: {
|
case Delta::Action::REMOVE_LABEL: {
|
||||||
if (delta.label == label) {
|
if (delta.label.value == label) {
|
||||||
MG_ASSERT(has_label, "Invalid database state!");
|
MG_ASSERT(has_label, "Invalid database state!");
|
||||||
has_label = false;
|
has_label = false;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case Delta::Action::ADD_LABEL: {
|
case Delta::Action::ADD_LABEL: {
|
||||||
if (delta.label == label) {
|
if (delta.label.value == label) {
|
||||||
MG_ASSERT(!has_label, "Invalid database state!");
|
MG_ASSERT(!has_label, "Invalid database state!");
|
||||||
has_label = true;
|
has_label = true;
|
||||||
}
|
}
|
||||||
@ -177,7 +177,7 @@ PropertyValue GetVertexProperty(const Vertex &vertex, PropertyId property, Trans
|
|||||||
switch (delta.action) {
|
switch (delta.action) {
|
||||||
case Delta::Action::SET_PROPERTY: {
|
case Delta::Action::SET_PROPERTY: {
|
||||||
if (delta.property.key == property) {
|
if (delta.property.key == property) {
|
||||||
value = delta.property.value;
|
value = *delta.property.value;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -1682,9 +1682,9 @@ utils::BasicResult<StorageManipulationError, void> DiskStorage::DiskAccessor::Co
|
|||||||
} break;
|
} break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if (transaction_.deltas.use().empty() ||
|
} else if (transaction_.deltas.empty() ||
|
||||||
(!edge_import_mode_active &&
|
(!edge_import_mode_active &&
|
||||||
std::all_of(transaction_.deltas.use().begin(), transaction_.deltas.use().end(), [](const Delta &delta) {
|
std::all_of(transaction_.deltas.begin(), transaction_.deltas.end(), [](const Delta &delta) {
|
||||||
return delta.action == Delta::Action::DELETE_DESERIALIZED_OBJECT;
|
return delta.action == Delta::Action::DELETE_DESERIALIZED_OBJECT;
|
||||||
}))) {
|
}))) {
|
||||||
} else {
|
} else {
|
||||||
@ -1812,7 +1812,7 @@ void DiskStorage::DiskAccessor::UpdateObjectsCountOnAbort() {
|
|||||||
auto *disk_storage = static_cast<DiskStorage *>(storage_);
|
auto *disk_storage = static_cast<DiskStorage *>(storage_);
|
||||||
uint64_t transaction_id = transaction_.transaction_id;
|
uint64_t transaction_id = transaction_.transaction_id;
|
||||||
|
|
||||||
for (const auto &delta : transaction_.deltas.use()) {
|
for (const auto &delta : transaction_.deltas) {
|
||||||
auto prev = delta.prev.Get();
|
auto prev = delta.prev.Get();
|
||||||
switch (prev.type) {
|
switch (prev.type) {
|
||||||
case PreviousPtr::Type::VERTEX: {
|
case PreviousPtr::Type::VERTEX: {
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
// Copyright 2023 Memgraph Ltd.
|
// Copyright 2024 Memgraph Ltd.
|
||||||
//
|
//
|
||||||
// Use of this software is governed by the Business Source License
|
// Use of this software is governed by the Business Source License
|
||||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||||
@ -580,7 +580,7 @@ void EncodeDelta(BaseEncoder *encoder, NameIdMapper *name_id_mapper, SalientConf
|
|||||||
case Delta::Action::REMOVE_LABEL: {
|
case Delta::Action::REMOVE_LABEL: {
|
||||||
encoder->WriteMarker(VertexActionToMarker(delta.action));
|
encoder->WriteMarker(VertexActionToMarker(delta.action));
|
||||||
encoder->WriteUint(vertex.gid.AsUint());
|
encoder->WriteUint(vertex.gid.AsUint());
|
||||||
encoder->WriteString(name_id_mapper->IdToName(delta.label.AsUint()));
|
encoder->WriteString(name_id_mapper->IdToName(delta.label.value.AsUint()));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case Delta::Action::ADD_OUT_EDGE:
|
case Delta::Action::ADD_OUT_EDGE:
|
||||||
|
@ -237,7 +237,7 @@ Result<PropertyValue> EdgeAccessor::GetProperty(PropertyId property, View view)
|
|||||||
switch (delta.action) {
|
switch (delta.action) {
|
||||||
case Delta::Action::SET_PROPERTY: {
|
case Delta::Action::SET_PROPERTY: {
|
||||||
if (delta.property.key == property) {
|
if (delta.property.key == property) {
|
||||||
*value = delta.property.value;
|
*value = *delta.property.value;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -281,15 +281,15 @@ Result<std::map<PropertyId, PropertyValue>> EdgeAccessor::Properties(View view)
|
|||||||
case Delta::Action::SET_PROPERTY: {
|
case Delta::Action::SET_PROPERTY: {
|
||||||
auto it = properties.find(delta.property.key);
|
auto it = properties.find(delta.property.key);
|
||||||
if (it != properties.end()) {
|
if (it != properties.end()) {
|
||||||
if (delta.property.value.IsNull()) {
|
if (delta.property.value->IsNull()) {
|
||||||
// remove the property
|
// remove the property
|
||||||
properties.erase(it);
|
properties.erase(it);
|
||||||
} else {
|
} else {
|
||||||
// set the value
|
// set the value
|
||||||
it->second = delta.property.value;
|
it->second = *delta.property.value;
|
||||||
}
|
}
|
||||||
} else if (!delta.property.value.IsNull()) {
|
} else if (!delta.property.value->IsNull()) {
|
||||||
properties.emplace(delta.property.key, delta.property.value);
|
properties.emplace(delta.property.key, *delta.property.value);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
// Copyright 2023 Memgraph Ltd.
|
// Copyright 2024 Memgraph Ltd.
|
||||||
//
|
//
|
||||||
// Use of this software is governed by the Business Source License
|
// Use of this software is governed by the Business Source License
|
||||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||||
@ -23,24 +23,24 @@
|
|||||||
|
|
||||||
namespace memgraph::storage {
|
namespace memgraph::storage {
|
||||||
|
|
||||||
#define STORAGE_DEFINE_ID_TYPE(name) \
|
#define STORAGE_DEFINE_ID_TYPE(name, type_store, type_conv, parse) \
|
||||||
class name final { \
|
class name final { \
|
||||||
private: \
|
private: \
|
||||||
explicit name(uint64_t id) : id_(id) {} \
|
explicit name(type_store id) : id_(id) {} \
|
||||||
\
|
\
|
||||||
public: \
|
public: \
|
||||||
/* Default constructor to allow serialization or preallocation. */ \
|
/* Default constructor to allow serialization or preallocation. */ \
|
||||||
name() = default; \
|
name() = default; \
|
||||||
\
|
\
|
||||||
static name FromUint(uint64_t id) { return name{id}; } \
|
static name FromUint(type_store id) { return name{id}; } \
|
||||||
static name FromInt(int64_t id) { return name{utils::MemcpyCast<uint64_t>(id)}; } \
|
static name FromInt(type_conv id) { return name{utils::MemcpyCast<type_store>(id)}; } \
|
||||||
uint64_t AsUint() const { return id_; } \
|
type_store AsUint() const { return id_; } \
|
||||||
int64_t AsInt() const { return utils::MemcpyCast<int64_t>(id_); } \
|
type_conv AsInt() const { return utils::MemcpyCast<type_conv>(id_); } \
|
||||||
static name FromString(std::string_view id) { return name{utils::ParseStringToUint64(id)}; } \
|
static name FromString(std::string_view id) { return name{parse(id)}; } \
|
||||||
std::string ToString() const { return std::to_string(id_); } \
|
std::string ToString() const { return std::to_string(id_); } \
|
||||||
\
|
\
|
||||||
private: \
|
private: \
|
||||||
uint64_t id_; \
|
type_store id_; \
|
||||||
}; \
|
}; \
|
||||||
static_assert(std::is_trivially_copyable_v<name>, "storage::" #name " must be trivially copyable!"); \
|
static_assert(std::is_trivially_copyable_v<name>, "storage::" #name " must be trivially copyable!"); \
|
||||||
inline bool operator==(const name &first, const name &second) { return first.AsUint() == second.AsUint(); } \
|
inline bool operator==(const name &first, const name &second) { return first.AsUint() == second.AsUint(); } \
|
||||||
@ -50,10 +50,10 @@ namespace memgraph::storage {
|
|||||||
inline bool operator<=(const name &first, const name &second) { return first.AsUint() <= second.AsUint(); } \
|
inline bool operator<=(const name &first, const name &second) { return first.AsUint() <= second.AsUint(); } \
|
||||||
inline bool operator>=(const name &first, const name &second) { return first.AsUint() >= second.AsUint(); }
|
inline bool operator>=(const name &first, const name &second) { return first.AsUint() >= second.AsUint(); }
|
||||||
|
|
||||||
STORAGE_DEFINE_ID_TYPE(Gid);
|
STORAGE_DEFINE_ID_TYPE(Gid, uint64_t, int64_t, utils::ParseStringToUint64);
|
||||||
STORAGE_DEFINE_ID_TYPE(LabelId);
|
STORAGE_DEFINE_ID_TYPE(LabelId, uint32_t, int32_t, utils::ParseStringToUint32);
|
||||||
STORAGE_DEFINE_ID_TYPE(PropertyId);
|
STORAGE_DEFINE_ID_TYPE(PropertyId, uint32_t, int32_t, utils::ParseStringToUint32);
|
||||||
STORAGE_DEFINE_ID_TYPE(EdgeTypeId);
|
STORAGE_DEFINE_ID_TYPE(EdgeTypeId, uint32_t, int32_t, utils::ParseStringToUint32);
|
||||||
|
|
||||||
#undef STORAGE_DEFINE_ID_TYPE
|
#undef STORAGE_DEFINE_ID_TYPE
|
||||||
|
|
||||||
|
@ -72,13 +72,13 @@ inline bool AnyVersionHasLabel(const Vertex &vertex, LabelId label, uint64_t tim
|
|||||||
return AnyVersionSatisfiesPredicate<interesting>(timestamp, delta, [&has_label, &deleted, label](const Delta &delta) {
|
return AnyVersionSatisfiesPredicate<interesting>(timestamp, delta, [&has_label, &deleted, label](const Delta &delta) {
|
||||||
switch (delta.action) {
|
switch (delta.action) {
|
||||||
case Delta::Action::ADD_LABEL:
|
case Delta::Action::ADD_LABEL:
|
||||||
if (delta.label == label) {
|
if (delta.label.value == label) {
|
||||||
MG_ASSERT(!has_label, "Invalid database state!");
|
MG_ASSERT(!has_label, "Invalid database state!");
|
||||||
has_label = true;
|
has_label = true;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case Delta::Action::REMOVE_LABEL:
|
case Delta::Action::REMOVE_LABEL:
|
||||||
if (delta.label == label) {
|
if (delta.label.value == label) {
|
||||||
MG_ASSERT(has_label, "Invalid database state!");
|
MG_ASSERT(has_label, "Invalid database state!");
|
||||||
has_label = false;
|
has_label = false;
|
||||||
}
|
}
|
||||||
@ -135,20 +135,20 @@ inline bool AnyVersionHasLabelProperty(const Vertex &vertex, LabelId label, Prop
|
|||||||
timestamp, delta, [&has_label, ¤t_value_equal_to_value, &deleted, label, key, &value](const Delta &delta) {
|
timestamp, delta, [&has_label, ¤t_value_equal_to_value, &deleted, label, key, &value](const Delta &delta) {
|
||||||
switch (delta.action) {
|
switch (delta.action) {
|
||||||
case Delta::Action::ADD_LABEL:
|
case Delta::Action::ADD_LABEL:
|
||||||
if (delta.label == label) {
|
if (delta.label.value == label) {
|
||||||
MG_ASSERT(!has_label, "Invalid database state!");
|
MG_ASSERT(!has_label, "Invalid database state!");
|
||||||
has_label = true;
|
has_label = true;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case Delta::Action::REMOVE_LABEL:
|
case Delta::Action::REMOVE_LABEL:
|
||||||
if (delta.label == label) {
|
if (delta.label.value == label) {
|
||||||
MG_ASSERT(has_label, "Invalid database state!");
|
MG_ASSERT(has_label, "Invalid database state!");
|
||||||
has_label = false;
|
has_label = false;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case Delta::Action::SET_PROPERTY:
|
case Delta::Action::SET_PROPERTY:
|
||||||
if (delta.property.key == key) {
|
if (delta.property.key == key) {
|
||||||
current_value_equal_to_value = delta.property.value == value;
|
current_value_equal_to_value = *delta.property.value == value;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case Delta::Action::RECREATE_OBJECT: {
|
case Delta::Action::RECREATE_OBJECT: {
|
||||||
|
@ -176,8 +176,8 @@ InMemoryStorage::~InMemoryStorage() {
|
|||||||
committed_transactions_.WithLock([](auto &transactions) { transactions.clear(); });
|
committed_transactions_.WithLock([](auto &transactions) { transactions.clear(); });
|
||||||
}
|
}
|
||||||
|
|
||||||
InMemoryStorage::InMemoryAccessor::InMemoryAccessor(auto tag, InMemoryStorage *storage, IsolationLevel isolation_level,
|
InMemoryStorage::InMemoryAccessor::InMemoryAccessor(
|
||||||
StorageMode storage_mode,
|
auto tag, InMemoryStorage *storage, IsolationLevel isolation_level, StorageMode storage_mode,
|
||||||
memgraph::replication_coordination_glue::ReplicationRole replication_role)
|
memgraph::replication_coordination_glue::ReplicationRole replication_role)
|
||||||
: Accessor(tag, storage, isolation_level, storage_mode, replication_role),
|
: Accessor(tag, storage, isolation_level, storage_mode, replication_role),
|
||||||
config_(storage->config_.salient.items) {}
|
config_(storage->config_.salient.items) {}
|
||||||
@ -757,7 +757,7 @@ utils::BasicResult<StorageManipulationError, void> InMemoryStorage::InMemoryAcce
|
|||||||
auto *mem_storage = static_cast<InMemoryStorage *>(storage_);
|
auto *mem_storage = static_cast<InMemoryStorage *>(storage_);
|
||||||
|
|
||||||
// TODO: duplicated transaction finalisation in md_deltas and deltas processing cases
|
// TODO: duplicated transaction finalisation in md_deltas and deltas processing cases
|
||||||
if (transaction_.deltas.use().empty() && transaction_.md_deltas.empty()) {
|
if (transaction_.deltas.empty() && transaction_.md_deltas.empty()) {
|
||||||
// We don't have to update the commit timestamp here because no one reads
|
// We don't have to update the commit timestamp here because no one reads
|
||||||
// it.
|
// it.
|
||||||
mem_storage->commit_log_->MarkFinished(transaction_.start_timestamp);
|
mem_storage->commit_log_->MarkFinished(transaction_.start_timestamp);
|
||||||
@ -836,25 +836,37 @@ utils::BasicResult<StorageManipulationError, void> InMemoryStorage::InMemoryAcce
|
|||||||
// Replica can log only the write transaction received from Main
|
// Replica can log only the write transaction received from Main
|
||||||
// so the Wal files are consistent
|
// so the Wal files are consistent
|
||||||
if (is_main_or_replica_write) {
|
if (is_main_or_replica_write) {
|
||||||
could_replicate_all_sync_replicas = mem_storage->AppendToWal(transaction_, *commit_timestamp_,
|
could_replicate_all_sync_replicas =
|
||||||
std::move(db_acc)); // protected by engine_guard
|
mem_storage->AppendToWal(transaction_, *commit_timestamp_, std::move(db_acc));
|
||||||
|
|
||||||
// TODO: release lock, and update all deltas to have a local copy of the commit timestamp
|
// TODO: release lock, and update all deltas to have a local copy of the commit timestamp
|
||||||
MG_ASSERT(transaction_.commit_timestamp != nullptr, "Invalid database state!");
|
MG_ASSERT(transaction_.commit_timestamp != nullptr, "Invalid database state!");
|
||||||
transaction_.commit_timestamp->store(*commit_timestamp_,
|
transaction_.commit_timestamp->store(*commit_timestamp_, std::memory_order_release);
|
||||||
std::memory_order_release); // protected by engine_guard
|
|
||||||
// Replica can only update the last commit timestamp with
|
// Replica can only update the last commit timestamp with
|
||||||
// the commits received from main.
|
// the commits received from main.
|
||||||
// Update the last commit timestamp
|
// Update the last commit timestamp
|
||||||
mem_storage->repl_storage_state_.last_commit_timestamp_.store(
|
mem_storage->repl_storage_state_.last_commit_timestamp_.store(*commit_timestamp_);
|
||||||
*commit_timestamp_); // protected by engine_guard
|
|
||||||
}
|
}
|
||||||
// Release engine lock because we don't have to hold it anymore
|
|
||||||
engine_guard.unlock();
|
|
||||||
|
|
||||||
|
// TODO: can and should this be moved earlier?
|
||||||
mem_storage->commit_log_->MarkFinished(start_timestamp);
|
mem_storage->commit_log_->MarkFinished(start_timestamp);
|
||||||
|
|
||||||
|
// while still holding engine lock
|
||||||
|
// and after durability + replication
|
||||||
|
// check if we can fast discard deltas (ie. do not hand over to GC)
|
||||||
|
bool no_older_transactions = mem_storage->commit_log_->OldestActive() == *commit_timestamp_;
|
||||||
|
bool no_newer_transactions = mem_storage->transaction_id_ == transaction_.transaction_id + 1;
|
||||||
|
if (no_older_transactions && no_newer_transactions) [[unlikely]] {
|
||||||
|
// STEP 0) Can only do fast discard if GC is not running
|
||||||
|
// We can't unlink our transcations deltas until all of the older deltas in GC have been unlinked
|
||||||
|
// must do a try here, to avoid deadlock between transactions `engine_lock_` and the GC `gc_lock_`
|
||||||
|
auto gc_guard = std::unique_lock{mem_storage->gc_lock_, std::defer_lock};
|
||||||
|
if (gc_guard.try_lock()) {
|
||||||
|
FastDiscardOfDeltas(*commit_timestamp_, std::move(gc_guard));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
} // Release engine lock because we don't have to hold it anymore
|
||||||
|
|
||||||
if (unique_constraint_violation) {
|
if (unique_constraint_violation) {
|
||||||
Abort();
|
Abort();
|
||||||
@ -873,19 +885,100 @@ utils::BasicResult<StorageManipulationError, void> InMemoryStorage::InMemoryAcce
|
|||||||
return {};
|
return {};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void InMemoryStorage::InMemoryAccessor::FastDiscardOfDeltas(uint64_t oldest_active_timestamp,
|
||||||
|
std::unique_lock<std::mutex> /*gc_guard*/) {
|
||||||
|
auto *mem_storage = static_cast<InMemoryStorage *>(storage_);
|
||||||
|
std::list<Gid> current_deleted_edges;
|
||||||
|
std::list<Gid> current_deleted_vertices;
|
||||||
|
|
||||||
|
auto const unlink_remove_clear = [&](std::deque<Delta> &deltas) {
|
||||||
|
for (auto &delta : deltas) {
|
||||||
|
auto prev = delta.prev.Get();
|
||||||
|
switch (prev.type) {
|
||||||
|
case PreviousPtr::Type::NULLPTR:
|
||||||
|
case PreviousPtr::Type::DELTA:
|
||||||
|
break;
|
||||||
|
case PreviousPtr::Type::VERTEX: {
|
||||||
|
// safe because no other txn can be reading this while we have engine lock
|
||||||
|
auto &vertex = *prev.vertex;
|
||||||
|
vertex.delta = nullptr;
|
||||||
|
if (vertex.deleted) {
|
||||||
|
DMG_ASSERT(delta.action == Delta::Action::RECREATE_OBJECT);
|
||||||
|
current_deleted_vertices.push_back(vertex.gid);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case PreviousPtr::Type::EDGE: {
|
||||||
|
// safe because no other txn can be reading this while we have engine lock
|
||||||
|
auto &edge = *prev.edge;
|
||||||
|
edge.delta = nullptr;
|
||||||
|
if (edge.deleted) {
|
||||||
|
DMG_ASSERT(delta.action == Delta::Action::RECREATE_OBJECT);
|
||||||
|
current_deleted_edges.push_back(edge.gid);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// delete deltas
|
||||||
|
deltas.clear();
|
||||||
|
};
|
||||||
|
|
||||||
|
// STEP 1) ensure everything in GC is gone
|
||||||
|
|
||||||
|
// 1.a) old garbage_undo_buffers are safe to remove
|
||||||
|
// we are the only transaction, no one is reading those unlinked deltas
|
||||||
|
mem_storage->garbage_undo_buffers_.WithLock([&](auto &garbage_undo_buffers) { garbage_undo_buffers.clear(); });
|
||||||
|
|
||||||
|
// 1.b.0) old committed_transactions_ need mininal unlinking + remove + clear
|
||||||
|
// must be done before this transactions delta unlinking
|
||||||
|
auto linked_undo_buffers = std::list<GCDeltas>{};
|
||||||
|
mem_storage->committed_transactions_.WithLock(
|
||||||
|
[&](auto &committed_transactions) { committed_transactions.swap(linked_undo_buffers); });
|
||||||
|
|
||||||
|
// 1.b.1) unlink, gathering the removals
|
||||||
|
for (auto &gc_deltas : linked_undo_buffers) {
|
||||||
|
unlink_remove_clear(gc_deltas.deltas_);
|
||||||
|
}
|
||||||
|
// 1.b.2) clear the list of deltas deques
|
||||||
|
linked_undo_buffers.clear();
|
||||||
|
|
||||||
|
// STEP 2) this transactions deltas also mininal unlinking + remove + clear
|
||||||
|
unlink_remove_clear(transaction_.deltas);
|
||||||
|
|
||||||
|
// STEP 3) skip_list removals
|
||||||
|
if (!current_deleted_vertices.empty()) {
|
||||||
|
// 3.a) clear from indexes first
|
||||||
|
std::stop_source dummy;
|
||||||
|
mem_storage->indices_.RemoveObsoleteEntries(oldest_active_timestamp, dummy.get_token());
|
||||||
|
auto *mem_unique_constraints =
|
||||||
|
static_cast<InMemoryUniqueConstraints *>(mem_storage->constraints_.unique_constraints_.get());
|
||||||
|
mem_unique_constraints->RemoveObsoleteEntries(oldest_active_timestamp, dummy.get_token());
|
||||||
|
|
||||||
|
// 3.b) remove from veretex skip_list
|
||||||
|
auto vertex_acc = mem_storage->vertices_.access();
|
||||||
|
for (auto gid : current_deleted_vertices) {
|
||||||
|
vertex_acc.remove(gid);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!current_deleted_edges.empty()) {
|
||||||
|
// 3.c) remove from edge skip_list
|
||||||
|
auto edge_acc = mem_storage->edges_.access();
|
||||||
|
for (auto gid : current_deleted_edges) {
|
||||||
|
edge_acc.remove(gid);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void InMemoryStorage::InMemoryAccessor::Abort() {
|
void InMemoryStorage::InMemoryAccessor::Abort() {
|
||||||
MG_ASSERT(is_transaction_active_, "The transaction is already terminated!");
|
MG_ASSERT(is_transaction_active_, "The transaction is already terminated!");
|
||||||
|
|
||||||
// We collect vertices and edges we've created here and then splice them into
|
auto *mem_storage = static_cast<InMemoryStorage *>(storage_);
|
||||||
// `deleted_vertices_` and `deleted_edges_` lists, instead of adding them one
|
|
||||||
// by one and acquiring lock every time.
|
|
||||||
std::list<Gid> my_deleted_vertices;
|
|
||||||
std::list<Gid> my_deleted_edges;
|
|
||||||
|
|
||||||
std::map<LabelId, std::vector<Vertex *>> label_cleanup;
|
|
||||||
std::map<LabelId, std::vector<std::pair<PropertyValue, Vertex *>>> label_property_cleanup;
|
|
||||||
std::map<PropertyId, std::vector<std::pair<PropertyValue, Vertex *>>> property_cleanup;
|
|
||||||
|
|
||||||
|
// if we have no deltas then no need to do any undo work during Abort
|
||||||
|
// note: this check also saves on unnecessary contention on `engine_lock_`
|
||||||
|
if (!transaction_.deltas.empty()) {
|
||||||
// CONSTRAINTS
|
// CONSTRAINTS
|
||||||
if (transaction_.constraint_verification_info.NeedsUniqueConstraintVerification()) {
|
if (transaction_.constraint_verification_info.NeedsUniqueConstraintVerification()) {
|
||||||
// Need to remove elements from constraints before handling of the deltas, so the elements match the correct
|
// Need to remove elements from constraints before handling of the deltas, so the elements match the correct
|
||||||
@ -897,7 +990,17 @@ void InMemoryStorage::InMemoryAccessor::Abort() {
|
|||||||
|
|
||||||
const auto index_stats = storage_->indices_.Analysis();
|
const auto index_stats = storage_->indices_.Analysis();
|
||||||
|
|
||||||
for (const auto &delta : transaction_.deltas.use()) {
|
// We collect vertices and edges we've created here and then splice them into
|
||||||
|
// `deleted_vertices_` and `deleted_edges_` lists, instead of adding them one
|
||||||
|
// by one and acquiring lock every time.
|
||||||
|
std::list<Gid> my_deleted_vertices;
|
||||||
|
std::list<Gid> my_deleted_edges;
|
||||||
|
|
||||||
|
std::map<LabelId, std::vector<Vertex *>> label_cleanup;
|
||||||
|
std::map<LabelId, std::vector<std::pair<PropertyValue, Vertex *>>> label_property_cleanup;
|
||||||
|
std::map<PropertyId, std::vector<std::pair<PropertyValue, Vertex *>>> property_cleanup;
|
||||||
|
|
||||||
|
for (const auto &delta : transaction_.deltas) {
|
||||||
auto prev = delta.prev.Get();
|
auto prev = delta.prev.Get();
|
||||||
switch (prev.type) {
|
switch (prev.type) {
|
||||||
case PreviousPtr::Type::VERTEX: {
|
case PreviousPtr::Type::VERTEX: {
|
||||||
@ -908,7 +1011,7 @@ void InMemoryStorage::InMemoryAccessor::Abort() {
|
|||||||
current->timestamp->load(std::memory_order_acquire) == transaction_.transaction_id) {
|
current->timestamp->load(std::memory_order_acquire) == transaction_.transaction_id) {
|
||||||
switch (current->action) {
|
switch (current->action) {
|
||||||
case Delta::Action::REMOVE_LABEL: {
|
case Delta::Action::REMOVE_LABEL: {
|
||||||
auto it = std::find(vertex->labels.begin(), vertex->labels.end(), current->label);
|
auto it = std::find(vertex->labels.begin(), vertex->labels.end(), current->label.value);
|
||||||
MG_ASSERT(it != vertex->labels.end(), "Invalid database state!");
|
MG_ASSERT(it != vertex->labels.end(), "Invalid database state!");
|
||||||
std::swap(*it, *vertex->labels.rbegin());
|
std::swap(*it, *vertex->labels.rbegin());
|
||||||
vertex->labels.pop_back();
|
vertex->labels.pop_back();
|
||||||
@ -918,24 +1021,24 @@ void InMemoryStorage::InMemoryAccessor::Abort() {
|
|||||||
// For property label index
|
// For property label index
|
||||||
// check if we care about the label; this will return all the propertyIds we care about and then get
|
// check if we care about the label; this will return all the propertyIds we care about and then get
|
||||||
// the current property value
|
// the current property value
|
||||||
if (std::binary_search(index_stats.label.begin(), index_stats.label.end(), current->label)) {
|
if (std::binary_search(index_stats.label.begin(), index_stats.label.end(), current->label.value)) {
|
||||||
label_cleanup[current->label].emplace_back(vertex);
|
label_cleanup[current->label.value].emplace_back(vertex);
|
||||||
}
|
}
|
||||||
const auto &properties = index_stats.property_label.l2p.find(current->label);
|
const auto &properties = index_stats.property_label.l2p.find(current->label.value);
|
||||||
if (properties != index_stats.property_label.l2p.end()) {
|
if (properties != index_stats.property_label.l2p.end()) {
|
||||||
for (const auto &property : properties->second) {
|
for (const auto &property : properties->second) {
|
||||||
auto current_value = vertex->properties.GetProperty(property);
|
auto current_value = vertex->properties.GetProperty(property);
|
||||||
if (!current_value.IsNull()) {
|
if (!current_value.IsNull()) {
|
||||||
label_property_cleanup[current->label].emplace_back(std::move(current_value), vertex);
|
label_property_cleanup[current->label.value].emplace_back(std::move(current_value), vertex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case Delta::Action::ADD_LABEL: {
|
case Delta::Action::ADD_LABEL: {
|
||||||
auto it = std::find(vertex->labels.begin(), vertex->labels.end(), current->label);
|
auto it = std::find(vertex->labels.begin(), vertex->labels.end(), current->label.value);
|
||||||
MG_ASSERT(it == vertex->labels.end(), "Invalid database state!");
|
MG_ASSERT(it == vertex->labels.end(), "Invalid database state!");
|
||||||
vertex->labels.push_back(current->label);
|
vertex->labels.push_back(current->label.value);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case Delta::Action::SET_PROPERTY: {
|
case Delta::Action::SET_PROPERTY: {
|
||||||
@ -951,7 +1054,7 @@ void InMemoryStorage::InMemoryAccessor::Abort() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Setting the correct value
|
// Setting the correct value
|
||||||
vertex->properties.SetProperty(current->property.key, current->property.value);
|
vertex->properties.SetProperty(current->property.key, *current->property.value);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case Delta::Action::ADD_IN_EDGE: {
|
case Delta::Action::ADD_IN_EDGE: {
|
||||||
@ -1026,7 +1129,7 @@ void InMemoryStorage::InMemoryAccessor::Abort() {
|
|||||||
current->timestamp->load(std::memory_order_acquire) == transaction_.transaction_id) {
|
current->timestamp->load(std::memory_order_acquire) == transaction_.transaction_id) {
|
||||||
switch (current->action) {
|
switch (current->action) {
|
||||||
case Delta::Action::SET_PROPERTY: {
|
case Delta::Action::SET_PROPERTY: {
|
||||||
edge->properties.SetProperty(current->property.key, current->property.value);
|
edge->properties.SetProperty(current->property.key, *current->property.value);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case Delta::Action::DELETE_DESERIALIZED_OBJECT:
|
case Delta::Action::DELETE_DESERIALIZED_OBJECT:
|
||||||
@ -1065,7 +1168,6 @@ void InMemoryStorage::InMemoryAccessor::Abort() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
auto *mem_storage = static_cast<InMemoryStorage *>(storage_);
|
|
||||||
{
|
{
|
||||||
auto engine_guard = std::unique_lock(storage_->engine_lock_);
|
auto engine_guard = std::unique_lock(storage_->engine_lock_);
|
||||||
uint64_t mark_timestamp = storage_->timestamp_;
|
uint64_t mark_timestamp = storage_->timestamp_;
|
||||||
@ -1111,6 +1213,7 @@ void InMemoryStorage::InMemoryAccessor::Abort() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
mem_storage->commit_log_->MarkFinished(transaction_.start_timestamp);
|
mem_storage->commit_log_->MarkFinished(transaction_.start_timestamp);
|
||||||
is_transaction_active_ = false;
|
is_transaction_active_ = false;
|
||||||
@ -1121,7 +1224,7 @@ void InMemoryStorage::InMemoryAccessor::FinalizeTransaction() {
|
|||||||
auto *mem_storage = static_cast<InMemoryStorage *>(storage_);
|
auto *mem_storage = static_cast<InMemoryStorage *>(storage_);
|
||||||
mem_storage->commit_log_->MarkFinished(*commit_timestamp_);
|
mem_storage->commit_log_->MarkFinished(*commit_timestamp_);
|
||||||
|
|
||||||
if (!transaction_.deltas.use().empty()) {
|
if (!transaction_.deltas.empty()) {
|
||||||
// Only hand over delta to be GC'ed if there was any deltas
|
// Only hand over delta to be GC'ed if there was any deltas
|
||||||
mem_storage->committed_transactions_.WithLock([&](auto &committed_transactions) {
|
mem_storage->committed_transactions_.WithLock([&](auto &committed_transactions) {
|
||||||
// using mark of 0 as GC will assign a mark_timestamp after unlinking
|
// using mark of 0 as GC will assign a mark_timestamp after unlinking
|
||||||
@ -1462,7 +1565,7 @@ void InMemoryStorage::CollectGarbage(std::unique_lock<utils::ResourceLock> main_
|
|||||||
// chain in a broken state.
|
// chain in a broken state.
|
||||||
// The chain can be only read without taking any locks.
|
// The chain can be only read without taking any locks.
|
||||||
|
|
||||||
for (Delta &delta : linked_entry->deltas_.use()) {
|
for (Delta &delta : linked_entry->deltas_) {
|
||||||
while (true) {
|
while (true) {
|
||||||
auto prev = delta.prev.Get();
|
auto prev = delta.prev.Get();
|
||||||
switch (prev.type) {
|
switch (prev.type) {
|
||||||
@ -1781,7 +1884,7 @@ bool InMemoryStorage::AppendToWal(const Transaction &transaction, uint64_t final
|
|||||||
|
|
||||||
// 1. Process all Vertex deltas and store all operations that create vertices
|
// 1. Process all Vertex deltas and store all operations that create vertices
|
||||||
// and modify vertex data.
|
// and modify vertex data.
|
||||||
for (const auto &delta : transaction.deltas.use()) {
|
for (const auto &delta : transaction.deltas) {
|
||||||
auto prev = delta.prev.Get();
|
auto prev = delta.prev.Get();
|
||||||
MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!");
|
MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!");
|
||||||
if (prev.type != PreviousPtr::Type::VERTEX) continue;
|
if (prev.type != PreviousPtr::Type::VERTEX) continue;
|
||||||
@ -1804,7 +1907,7 @@ bool InMemoryStorage::AppendToWal(const Transaction &transaction, uint64_t final
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
// 2. Process all Vertex deltas and store all operations that create edges.
|
// 2. Process all Vertex deltas and store all operations that create edges.
|
||||||
for (const auto &delta : transaction.deltas.use()) {
|
for (const auto &delta : transaction.deltas) {
|
||||||
auto prev = delta.prev.Get();
|
auto prev = delta.prev.Get();
|
||||||
MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!");
|
MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!");
|
||||||
if (prev.type != PreviousPtr::Type::VERTEX) continue;
|
if (prev.type != PreviousPtr::Type::VERTEX) continue;
|
||||||
@ -1826,7 +1929,7 @@ bool InMemoryStorage::AppendToWal(const Transaction &transaction, uint64_t final
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
// 3. Process all Edge deltas and store all operations that modify edge data.
|
// 3. Process all Edge deltas and store all operations that modify edge data.
|
||||||
for (const auto &delta : transaction.deltas.use()) {
|
for (const auto &delta : transaction.deltas) {
|
||||||
auto prev = delta.prev.Get();
|
auto prev = delta.prev.Get();
|
||||||
MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!");
|
MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!");
|
||||||
if (prev.type != PreviousPtr::Type::EDGE) continue;
|
if (prev.type != PreviousPtr::Type::EDGE) continue;
|
||||||
@ -1848,7 +1951,7 @@ bool InMemoryStorage::AppendToWal(const Transaction &transaction, uint64_t final
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
// 4. Process all Vertex deltas and store all operations that delete edges.
|
// 4. Process all Vertex deltas and store all operations that delete edges.
|
||||||
for (const auto &delta : transaction.deltas.use()) {
|
for (const auto &delta : transaction.deltas) {
|
||||||
auto prev = delta.prev.Get();
|
auto prev = delta.prev.Get();
|
||||||
MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!");
|
MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!");
|
||||||
if (prev.type != PreviousPtr::Type::VERTEX) continue;
|
if (prev.type != PreviousPtr::Type::VERTEX) continue;
|
||||||
@ -1870,7 +1973,7 @@ bool InMemoryStorage::AppendToWal(const Transaction &transaction, uint64_t final
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
// 5. Process all Vertex deltas and store all operations that delete vertices.
|
// 5. Process all Vertex deltas and store all operations that delete vertices.
|
||||||
for (const auto &delta : transaction.deltas.use()) {
|
for (const auto &delta : transaction.deltas) {
|
||||||
auto prev = delta.prev.Get();
|
auto prev = delta.prev.Get();
|
||||||
MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!");
|
MG_ASSERT(prev.type != PreviousPtr::Type::NULLPTR, "Invalid pointer!");
|
||||||
if (prev.type != PreviousPtr::Type::VERTEX) continue;
|
if (prev.type != PreviousPtr::Type::VERTEX) continue;
|
||||||
@ -1894,7 +1997,7 @@ bool InMemoryStorage::AppendToWal(const Transaction &transaction, uint64_t final
|
|||||||
};
|
};
|
||||||
|
|
||||||
// Handle MVCC deltas
|
// Handle MVCC deltas
|
||||||
if (!transaction.deltas.use().empty()) {
|
if (!transaction.deltas.empty()) {
|
||||||
append_deltas([&](const Delta &delta, const auto &parent, uint64_t timestamp) {
|
append_deltas([&](const Delta &delta, const auto &parent, uint64_t timestamp) {
|
||||||
wal_file_->AppendDelta(delta, parent, timestamp);
|
wal_file_->AppendDelta(delta, parent, timestamp);
|
||||||
repl_storage_state_.AppendDelta(delta, parent, timestamp);
|
repl_storage_state_.AppendDelta(delta, parent, timestamp);
|
||||||
|
@ -302,6 +302,9 @@ class InMemoryStorage final : public Storage {
|
|||||||
/// @throw std::bad_alloc
|
/// @throw std::bad_alloc
|
||||||
Result<EdgeAccessor> CreateEdgeEx(VertexAccessor *from, VertexAccessor *to, EdgeTypeId edge_type, storage::Gid gid);
|
Result<EdgeAccessor> CreateEdgeEx(VertexAccessor *from, VertexAccessor *to, EdgeTypeId edge_type, storage::Gid gid);
|
||||||
|
|
||||||
|
/// Duiring commit, in some cases you do not need to hand over deltas to GC
|
||||||
|
/// in those cases this method is a light weight way to unlink and discard our deltas
|
||||||
|
void FastDiscardOfDeltas(uint64_t oldest_active_timestamp, std::unique_lock<std::mutex> gc_guard);
|
||||||
SalientConfig::Items config_;
|
SalientConfig::Items config_;
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -429,16 +432,15 @@ class InMemoryStorage final : public Storage {
|
|||||||
utils::Scheduler gc_runner_;
|
utils::Scheduler gc_runner_;
|
||||||
std::mutex gc_lock_;
|
std::mutex gc_lock_;
|
||||||
|
|
||||||
using BondPmrLd = Bond<utils::pmr::list<Delta>>;
|
|
||||||
struct GCDeltas {
|
struct GCDeltas {
|
||||||
GCDeltas(uint64_t mark_timestamp, BondPmrLd deltas, std::unique_ptr<std::atomic<uint64_t>> commit_timestamp)
|
GCDeltas(uint64_t mark_timestamp, std::deque<Delta> deltas, std::unique_ptr<std::atomic<uint64_t>> commit_timestamp)
|
||||||
: mark_timestamp_{mark_timestamp}, deltas_{std::move(deltas)}, commit_timestamp_{std::move(commit_timestamp)} {}
|
: mark_timestamp_{mark_timestamp}, deltas_{std::move(deltas)}, commit_timestamp_{std::move(commit_timestamp)} {}
|
||||||
|
|
||||||
GCDeltas(GCDeltas &&) = default;
|
GCDeltas(GCDeltas &&) = default;
|
||||||
GCDeltas &operator=(GCDeltas &&) = default;
|
GCDeltas &operator=(GCDeltas &&) = default;
|
||||||
|
|
||||||
uint64_t mark_timestamp_{}; //!< a timestamp no active transaction currently has
|
uint64_t mark_timestamp_{}; //!< a timestamp no active transaction currently has
|
||||||
BondPmrLd deltas_; //!< the deltas that need cleaning
|
std::deque<Delta> deltas_; //!< the deltas that need cleaning
|
||||||
std::unique_ptr<std::atomic<uint64_t>> commit_timestamp_{}; //!< the timestamp the deltas are pointing at
|
std::unique_ptr<std::atomic<uint64_t>> commit_timestamp_{}; //!< the timestamp the deltas are pointing at
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -80,7 +80,7 @@ bool LastCommittedVersionHasLabelProperty(const Vertex &vertex, LabelId label, c
|
|||||||
case Delta::Action::SET_PROPERTY: {
|
case Delta::Action::SET_PROPERTY: {
|
||||||
auto pos = FindPropertyPosition(property_array, delta->property.key);
|
auto pos = FindPropertyPosition(property_array, delta->property.key);
|
||||||
if (pos) {
|
if (pos) {
|
||||||
current_value_equal_to_value[*pos] = delta->property.value == value_array[*pos];
|
current_value_equal_to_value[*pos] = *delta->property.value == value_array[*pos];
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -96,14 +96,14 @@ bool LastCommittedVersionHasLabelProperty(const Vertex &vertex, LabelId label, c
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case Delta::Action::ADD_LABEL: {
|
case Delta::Action::ADD_LABEL: {
|
||||||
if (delta->label == label) {
|
if (delta->label.value == label) {
|
||||||
MG_ASSERT(!has_label, "Invalid database state!");
|
MG_ASSERT(!has_label, "Invalid database state!");
|
||||||
has_label = true;
|
has_label = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case Delta::Action::REMOVE_LABEL: {
|
case Delta::Action::REMOVE_LABEL: {
|
||||||
if (delta->label == label) {
|
if (delta->label.value == label) {
|
||||||
MG_ASSERT(has_label, "Invalid database state!");
|
MG_ASSERT(has_label, "Invalid database state!");
|
||||||
has_label = false;
|
has_label = false;
|
||||||
break;
|
break;
|
||||||
@ -190,13 +190,13 @@ bool AnyVersionHasLabelProperty(const Vertex &vertex, LabelId label, const std::
|
|||||||
}
|
}
|
||||||
switch (delta->action) {
|
switch (delta->action) {
|
||||||
case Delta::Action::ADD_LABEL:
|
case Delta::Action::ADD_LABEL:
|
||||||
if (delta->label == label) {
|
if (delta->label.value == label) {
|
||||||
MG_ASSERT(!has_label, "Invalid database state!");
|
MG_ASSERT(!has_label, "Invalid database state!");
|
||||||
has_label = true;
|
has_label = true;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case Delta::Action::REMOVE_LABEL:
|
case Delta::Action::REMOVE_LABEL:
|
||||||
if (delta->label == label) {
|
if (delta->label.value == label) {
|
||||||
MG_ASSERT(has_label, "Invalid database state!");
|
MG_ASSERT(has_label, "Invalid database state!");
|
||||||
has_label = false;
|
has_label = false;
|
||||||
}
|
}
|
||||||
@ -204,7 +204,7 @@ bool AnyVersionHasLabelProperty(const Vertex &vertex, LabelId label, const std::
|
|||||||
case Delta::Action::SET_PROPERTY: {
|
case Delta::Action::SET_PROPERTY: {
|
||||||
auto pos = FindPropertyPosition(property_array, delta->property.key);
|
auto pos = FindPropertyPosition(property_array, delta->property.key);
|
||||||
if (pos) {
|
if (pos) {
|
||||||
current_value_equal_to_value[*pos] = delta->property.value == values[*pos];
|
current_value_equal_to_value[*pos] = *delta->property.value == values[*pos];
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
// Copyright 2023 Memgraph Ltd.
|
// Copyright 2024 Memgraph Ltd.
|
||||||
//
|
//
|
||||||
// Use of this software is governed by the Business Source License
|
// Use of this software is governed by the Business Source License
|
||||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||||
@ -114,7 +114,7 @@ inline Delta *CreateDeleteObjectDelta(Transaction *transaction) {
|
|||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
transaction->EnsureCommitTimestampExists();
|
transaction->EnsureCommitTimestampExists();
|
||||||
return &transaction->deltas.use().emplace_back(Delta::DeleteObjectTag(), transaction->commit_timestamp.get(),
|
return &transaction->deltas.emplace_back(Delta::DeleteObjectTag(), transaction->commit_timestamp.get(),
|
||||||
transaction->command_id);
|
transaction->command_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -133,19 +133,19 @@ inline Delta *CreateDeleteDeserializedObjectDelta(Transaction *transaction, std:
|
|||||||
transaction->EnsureCommitTimestampExists();
|
transaction->EnsureCommitTimestampExists();
|
||||||
// Should use utils::DecodeFixed64(ts.c_str()) once we will move to RocksDB real timestamps
|
// Should use utils::DecodeFixed64(ts.c_str()) once we will move to RocksDB real timestamps
|
||||||
uint64_t ts_id = utils::ParseStringToUint64(ts);
|
uint64_t ts_id = utils::ParseStringToUint64(ts);
|
||||||
return &transaction->deltas.use().emplace_back(Delta::DeleteDeserializedObjectTag(), ts_id, old_disk_key);
|
return &transaction->deltas.emplace_back(Delta::DeleteDeserializedObjectTag(), ts_id, std::move(old_disk_key));
|
||||||
}
|
}
|
||||||
|
|
||||||
inline Delta *CreateDeleteDeserializedObjectDelta(std::list<Delta> *deltas, std::optional<std::string> old_disk_key,
|
inline Delta *CreateDeleteDeserializedObjectDelta(std::list<Delta> *deltas, std::optional<std::string> old_disk_key,
|
||||||
std::string &&ts) {
|
std::string &&ts) {
|
||||||
// Should use utils::DecodeFixed64(ts.c_str()) once we will move to RocksDB real timestamps
|
// Should use utils::DecodeFixed64(ts.c_str()) once we will move to RocksDB real timestamps
|
||||||
uint64_t ts_id = utils::ParseStringToUint64(ts);
|
uint64_t ts_id = utils::ParseStringToUint64(ts);
|
||||||
return &deltas->emplace_back(Delta::DeleteDeserializedObjectTag(), ts_id, old_disk_key);
|
return &deltas->emplace_back(Delta::DeleteDeserializedObjectTag(), ts_id, std::move(old_disk_key));
|
||||||
}
|
}
|
||||||
|
|
||||||
inline Delta *CreateDeleteDeserializedIndexObjectDelta(std::list<Delta> &deltas,
|
inline Delta *CreateDeleteDeserializedIndexObjectDelta(std::list<Delta> &deltas,
|
||||||
std::optional<std::string> old_disk_key, const uint64_t ts) {
|
std::optional<std::string> old_disk_key, const uint64_t ts) {
|
||||||
return &deltas.emplace_back(Delta::DeleteDeserializedObjectTag(), ts, old_disk_key);
|
return &deltas.emplace_back(Delta::DeleteDeserializedObjectTag(), ts, std::move(old_disk_key));
|
||||||
}
|
}
|
||||||
|
|
||||||
/// TODO: what if in-memory analytical
|
/// TODO: what if in-memory analytical
|
||||||
@ -165,7 +165,7 @@ inline void CreateAndLinkDelta(Transaction *transaction, TObj *object, Args &&..
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
transaction->EnsureCommitTimestampExists();
|
transaction->EnsureCommitTimestampExists();
|
||||||
auto delta = &transaction->deltas.use().emplace_back(std::forward<Args>(args)..., transaction->commit_timestamp.get(),
|
auto delta = &transaction->deltas.emplace_back(std::forward<Args>(args)..., transaction->commit_timestamp.get(),
|
||||||
transaction->command_id);
|
transaction->command_id);
|
||||||
|
|
||||||
// The operations are written in such order so that both `next` and `prev`
|
// The operations are written in such order so that both `next` and `prev`
|
||||||
|
@ -57,38 +57,24 @@ class PropertyValue {
|
|||||||
PropertyValue() : type_(Type::Null) {}
|
PropertyValue() : type_(Type::Null) {}
|
||||||
|
|
||||||
// constructors for primitive types
|
// constructors for primitive types
|
||||||
explicit PropertyValue(const bool value) : type_(Type::Bool) { bool_v = value; }
|
explicit PropertyValue(const bool value) : bool_v{.val_ = value} {}
|
||||||
explicit PropertyValue(const int value) : type_(Type::Int) { int_v = value; }
|
explicit PropertyValue(const int value) : int_v{.val_ = value} {}
|
||||||
explicit PropertyValue(const int64_t value) : type_(Type::Int) { int_v = value; }
|
explicit PropertyValue(const int64_t value) : int_v{.val_ = value} {}
|
||||||
explicit PropertyValue(const double value) : type_(Type::Double) { double_v = value; }
|
explicit PropertyValue(const double value) : double_v{.val_ = value} {}
|
||||||
explicit PropertyValue(const TemporalData value) : type_{Type::TemporalData} { temporal_data_v = value; }
|
explicit PropertyValue(const TemporalData value) : temporal_data_v{.val_ = value} {}
|
||||||
|
|
||||||
// copy constructors for non-primitive types
|
// copy constructors for non-primitive types
|
||||||
/// @throw std::bad_alloc
|
/// @throw std::bad_alloc
|
||||||
explicit PropertyValue(const std::string &value) : type_(Type::String) { new (&string_v) std::string(value); }
|
explicit PropertyValue(std::string value) : string_v{.val_ = std::move(value)} {}
|
||||||
/// @throw std::bad_alloc
|
/// @throw std::bad_alloc
|
||||||
/// @throw std::length_error if length of value exceeds
|
/// @throw std::length_error if length of value exceeds
|
||||||
/// std::string::max_length().
|
/// std::string::max_length().
|
||||||
explicit PropertyValue(const char *value) : type_(Type::String) { new (&string_v) std::string(value); }
|
explicit PropertyValue(std::string_view value) : string_v{.val_ = std::string(value)} {}
|
||||||
|
explicit PropertyValue(char const *value) : string_v{.val_ = std::string(value)} {}
|
||||||
/// @throw std::bad_alloc
|
/// @throw std::bad_alloc
|
||||||
explicit PropertyValue(const std::vector<PropertyValue> &value) : type_(Type::List) {
|
explicit PropertyValue(std::vector<PropertyValue> value) : list_v{.val_ = std::move(value)} {}
|
||||||
new (&list_v) std::vector<PropertyValue>(value);
|
|
||||||
}
|
|
||||||
/// @throw std::bad_alloc
|
/// @throw std::bad_alloc
|
||||||
explicit PropertyValue(const std::map<std::string, PropertyValue> &value) : type_(Type::Map) {
|
explicit PropertyValue(std::map<std::string, PropertyValue> value) : map_v{.val_ = std::move(value)} {}
|
||||||
new (&map_v) std::map<std::string, PropertyValue>(value);
|
|
||||||
}
|
|
||||||
|
|
||||||
// move constructors for non-primitive types
|
|
||||||
explicit PropertyValue(std::string &&value) noexcept : type_(Type::String) {
|
|
||||||
new (&string_v) std::string(std::move(value));
|
|
||||||
}
|
|
||||||
explicit PropertyValue(std::vector<PropertyValue> &&value) noexcept : type_(Type::List) {
|
|
||||||
new (&list_v) std::vector<PropertyValue>(std::move(value));
|
|
||||||
}
|
|
||||||
explicit PropertyValue(std::map<std::string, PropertyValue> &&value) noexcept : type_(Type::Map) {
|
|
||||||
new (&map_v) std::map<std::string, PropertyValue>(std::move(value));
|
|
||||||
}
|
|
||||||
|
|
||||||
// copy constructor
|
// copy constructor
|
||||||
/// @throw std::bad_alloc
|
/// @throw std::bad_alloc
|
||||||
@ -126,21 +112,21 @@ class PropertyValue {
|
|||||||
if (type_ != Type::Bool) [[unlikely]] {
|
if (type_ != Type::Bool) [[unlikely]] {
|
||||||
throw PropertyValueException("The value isn't a bool!");
|
throw PropertyValueException("The value isn't a bool!");
|
||||||
}
|
}
|
||||||
return bool_v;
|
return bool_v.val_;
|
||||||
}
|
}
|
||||||
/// @throw PropertyValueException if value isn't of correct type.
|
/// @throw PropertyValueException if value isn't of correct type.
|
||||||
int64_t ValueInt() const {
|
int64_t ValueInt() const {
|
||||||
if (type_ != Type::Int) [[unlikely]] {
|
if (type_ != Type::Int) [[unlikely]] {
|
||||||
throw PropertyValueException("The value isn't an int!");
|
throw PropertyValueException("The value isn't an int!");
|
||||||
}
|
}
|
||||||
return int_v;
|
return int_v.val_;
|
||||||
}
|
}
|
||||||
/// @throw PropertyValueException if value isn't of correct type.
|
/// @throw PropertyValueException if value isn't of correct type.
|
||||||
double ValueDouble() const {
|
double ValueDouble() const {
|
||||||
if (type_ != Type::Double) [[unlikely]] {
|
if (type_ != Type::Double) [[unlikely]] {
|
||||||
throw PropertyValueException("The value isn't a double!");
|
throw PropertyValueException("The value isn't a double!");
|
||||||
}
|
}
|
||||||
return double_v;
|
return double_v.val_;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// @throw PropertyValueException if value isn't of correct type.
|
/// @throw PropertyValueException if value isn't of correct type.
|
||||||
@ -149,7 +135,7 @@ class PropertyValue {
|
|||||||
throw PropertyValueException("The value isn't a temporal data!");
|
throw PropertyValueException("The value isn't a temporal data!");
|
||||||
}
|
}
|
||||||
|
|
||||||
return temporal_data_v;
|
return temporal_data_v.val_;
|
||||||
}
|
}
|
||||||
|
|
||||||
// const value getters for non-primitive types
|
// const value getters for non-primitive types
|
||||||
@ -158,7 +144,7 @@ class PropertyValue {
|
|||||||
if (type_ != Type::String) [[unlikely]] {
|
if (type_ != Type::String) [[unlikely]] {
|
||||||
throw PropertyValueException("The value isn't a string!");
|
throw PropertyValueException("The value isn't a string!");
|
||||||
}
|
}
|
||||||
return string_v;
|
return string_v.val_;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// @throw PropertyValueException if value isn't of correct type.
|
/// @throw PropertyValueException if value isn't of correct type.
|
||||||
@ -166,7 +152,7 @@ class PropertyValue {
|
|||||||
if (type_ != Type::List) [[unlikely]] {
|
if (type_ != Type::List) [[unlikely]] {
|
||||||
throw PropertyValueException("The value isn't a list!");
|
throw PropertyValueException("The value isn't a list!");
|
||||||
}
|
}
|
||||||
return list_v;
|
return list_v.val_;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// @throw PropertyValueException if value isn't of correct type.
|
/// @throw PropertyValueException if value isn't of correct type.
|
||||||
@ -174,7 +160,7 @@ class PropertyValue {
|
|||||||
if (type_ != Type::Map) [[unlikely]] {
|
if (type_ != Type::Map) [[unlikely]] {
|
||||||
throw PropertyValueException("The value isn't a map!");
|
throw PropertyValueException("The value isn't a map!");
|
||||||
}
|
}
|
||||||
return map_v;
|
return map_v.val_;
|
||||||
}
|
}
|
||||||
|
|
||||||
// reference value getters for non-primitive types
|
// reference value getters for non-primitive types
|
||||||
@ -183,7 +169,7 @@ class PropertyValue {
|
|||||||
if (type_ != Type::String) [[unlikely]] {
|
if (type_ != Type::String) [[unlikely]] {
|
||||||
throw PropertyValueException("The value isn't a string!");
|
throw PropertyValueException("The value isn't a string!");
|
||||||
}
|
}
|
||||||
return string_v;
|
return string_v.val_;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// @throw PropertyValueException if value isn't of correct type.
|
/// @throw PropertyValueException if value isn't of correct type.
|
||||||
@ -191,7 +177,7 @@ class PropertyValue {
|
|||||||
if (type_ != Type::List) [[unlikely]] {
|
if (type_ != Type::List) [[unlikely]] {
|
||||||
throw PropertyValueException("The value isn't a list!");
|
throw PropertyValueException("The value isn't a list!");
|
||||||
}
|
}
|
||||||
return list_v;
|
return list_v.val_;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// @throw PropertyValueException if value isn't of correct type.
|
/// @throw PropertyValueException if value isn't of correct type.
|
||||||
@ -199,23 +185,45 @@ class PropertyValue {
|
|||||||
if (type_ != Type::Map) [[unlikely]] {
|
if (type_ != Type::Map) [[unlikely]] {
|
||||||
throw PropertyValueException("The value isn't a map!");
|
throw PropertyValueException("The value isn't a map!");
|
||||||
}
|
}
|
||||||
return map_v;
|
return map_v.val_;
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void DestroyValue() noexcept;
|
void DestroyValue() noexcept;
|
||||||
|
|
||||||
|
// NOTE: this may look strange but it is for better data layout
|
||||||
|
// https://eel.is/c++draft/class.union#general-note-1
|
||||||
union {
|
union {
|
||||||
bool bool_v;
|
|
||||||
int64_t int_v;
|
|
||||||
double double_v;
|
|
||||||
std::string string_v;
|
|
||||||
std::vector<PropertyValue> list_v;
|
|
||||||
std::map<std::string, PropertyValue> map_v;
|
|
||||||
TemporalData temporal_data_v;
|
|
||||||
};
|
|
||||||
|
|
||||||
Type type_;
|
Type type_;
|
||||||
|
struct {
|
||||||
|
Type type_ = Type::Bool;
|
||||||
|
bool val_;
|
||||||
|
} bool_v;
|
||||||
|
struct {
|
||||||
|
Type type_ = Type::Int;
|
||||||
|
int64_t val_;
|
||||||
|
} int_v;
|
||||||
|
struct {
|
||||||
|
Type type_ = Type::Double;
|
||||||
|
double val_;
|
||||||
|
} double_v;
|
||||||
|
struct {
|
||||||
|
Type type_ = Type::String;
|
||||||
|
std::string val_;
|
||||||
|
} string_v;
|
||||||
|
struct {
|
||||||
|
Type type_ = Type::List;
|
||||||
|
std::vector<PropertyValue> val_;
|
||||||
|
} list_v;
|
||||||
|
struct {
|
||||||
|
Type type_ = Type::Map;
|
||||||
|
std::map<std::string, PropertyValue> val_;
|
||||||
|
} map_v;
|
||||||
|
struct {
|
||||||
|
Type type_ = Type::TemporalData;
|
||||||
|
TemporalData val_;
|
||||||
|
} temporal_data_v;
|
||||||
|
};
|
||||||
};
|
};
|
||||||
|
|
||||||
// stream output
|
// stream output
|
||||||
@ -340,25 +348,25 @@ inline PropertyValue::PropertyValue(const PropertyValue &other) : type_(other.ty
|
|||||||
case Type::Null:
|
case Type::Null:
|
||||||
return;
|
return;
|
||||||
case Type::Bool:
|
case Type::Bool:
|
||||||
this->bool_v = other.bool_v;
|
this->bool_v.val_ = other.bool_v.val_;
|
||||||
return;
|
return;
|
||||||
case Type::Int:
|
case Type::Int:
|
||||||
this->int_v = other.int_v;
|
this->int_v.val_ = other.int_v.val_;
|
||||||
return;
|
return;
|
||||||
case Type::Double:
|
case Type::Double:
|
||||||
this->double_v = other.double_v;
|
this->double_v.val_ = other.double_v.val_;
|
||||||
return;
|
return;
|
||||||
case Type::String:
|
case Type::String:
|
||||||
new (&string_v) std::string(other.string_v);
|
new (&string_v.val_) std::string(other.string_v.val_);
|
||||||
return;
|
return;
|
||||||
case Type::List:
|
case Type::List:
|
||||||
new (&list_v) std::vector<PropertyValue>(other.list_v);
|
new (&list_v.val_) std::vector<PropertyValue>(other.list_v.val_);
|
||||||
return;
|
return;
|
||||||
case Type::Map:
|
case Type::Map:
|
||||||
new (&map_v) std::map<std::string, PropertyValue>(other.map_v);
|
new (&map_v.val_) std::map<std::string, PropertyValue>(other.map_v.val_);
|
||||||
return;
|
return;
|
||||||
case Type::TemporalData:
|
case Type::TemporalData:
|
||||||
this->temporal_data_v = other.temporal_data_v;
|
this->temporal_data_v.val_ = other.temporal_data_v.val_;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -368,28 +376,28 @@ inline PropertyValue::PropertyValue(PropertyValue &&other) noexcept : type_(std:
|
|||||||
case Type::Null:
|
case Type::Null:
|
||||||
break;
|
break;
|
||||||
case Type::Bool:
|
case Type::Bool:
|
||||||
bool_v = other.bool_v;
|
bool_v.val_ = other.bool_v.val_;
|
||||||
break;
|
break;
|
||||||
case Type::Int:
|
case Type::Int:
|
||||||
int_v = other.int_v;
|
int_v.val_ = other.int_v.val_;
|
||||||
break;
|
break;
|
||||||
case Type::Double:
|
case Type::Double:
|
||||||
double_v = other.double_v;
|
double_v.val_ = other.double_v.val_;
|
||||||
break;
|
break;
|
||||||
case Type::String:
|
case Type::String:
|
||||||
std::construct_at(&string_v, std::move(other.string_v));
|
std::construct_at(&string_v.val_, std::move(other.string_v.val_));
|
||||||
std::destroy_at(&other.string_v);
|
std::destroy_at(&other.string_v.val_);
|
||||||
break;
|
break;
|
||||||
case Type::List:
|
case Type::List:
|
||||||
std::construct_at(&list_v, std::move(other.list_v));
|
std::construct_at(&list_v.val_, std::move(other.list_v.val_));
|
||||||
std::destroy_at(&other.list_v);
|
std::destroy_at(&other.list_v.val_);
|
||||||
break;
|
break;
|
||||||
case Type::Map:
|
case Type::Map:
|
||||||
std::construct_at(&map_v, std::move(other.map_v));
|
std::construct_at(&map_v.val_, std::move(other.map_v.val_));
|
||||||
std::destroy_at(&other.map_v);
|
std::destroy_at(&other.map_v.val_);
|
||||||
break;
|
break;
|
||||||
case Type::TemporalData:
|
case Type::TemporalData:
|
||||||
temporal_data_v = other.temporal_data_v;
|
temporal_data_v.val_ = other.temporal_data_v.val_;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -404,25 +412,25 @@ inline PropertyValue &PropertyValue::operator=(const PropertyValue &other) {
|
|||||||
case Type::Null:
|
case Type::Null:
|
||||||
break;
|
break;
|
||||||
case Type::Bool:
|
case Type::Bool:
|
||||||
this->bool_v = other.bool_v;
|
this->bool_v.val_ = other.bool_v.val_;
|
||||||
break;
|
break;
|
||||||
case Type::Int:
|
case Type::Int:
|
||||||
this->int_v = other.int_v;
|
this->int_v.val_ = other.int_v.val_;
|
||||||
break;
|
break;
|
||||||
case Type::Double:
|
case Type::Double:
|
||||||
this->double_v = other.double_v;
|
this->double_v.val_ = other.double_v.val_;
|
||||||
break;
|
break;
|
||||||
case Type::String:
|
case Type::String:
|
||||||
new (&string_v) std::string(other.string_v);
|
new (&string_v.val_) std::string(other.string_v.val_);
|
||||||
break;
|
break;
|
||||||
case Type::List:
|
case Type::List:
|
||||||
new (&list_v) std::vector<PropertyValue>(other.list_v);
|
new (&list_v.val_) std::vector<PropertyValue>(other.list_v.val_);
|
||||||
break;
|
break;
|
||||||
case Type::Map:
|
case Type::Map:
|
||||||
new (&map_v) std::map<std::string, PropertyValue>(other.map_v);
|
new (&map_v.val_) std::map<std::string, PropertyValue>(other.map_v.val_);
|
||||||
break;
|
break;
|
||||||
case Type::TemporalData:
|
case Type::TemporalData:
|
||||||
this->temporal_data_v = other.temporal_data_v;
|
this->temporal_data_v.val_ = other.temporal_data_v.val_;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -438,28 +446,28 @@ inline PropertyValue &PropertyValue::operator=(PropertyValue &&other) noexcept {
|
|||||||
case Type::Null:
|
case Type::Null:
|
||||||
break;
|
break;
|
||||||
case Type::Bool:
|
case Type::Bool:
|
||||||
bool_v = other.bool_v;
|
bool_v.val_ = other.bool_v.val_;
|
||||||
break;
|
break;
|
||||||
case Type::Int:
|
case Type::Int:
|
||||||
int_v = other.int_v;
|
int_v.val_ = other.int_v.val_;
|
||||||
break;
|
break;
|
||||||
case Type::Double:
|
case Type::Double:
|
||||||
double_v = other.double_v;
|
double_v.val_ = other.double_v.val_;
|
||||||
break;
|
break;
|
||||||
case Type::String:
|
case Type::String:
|
||||||
string_v = std::move(other.string_v);
|
string_v.val_ = std::move(other.string_v.val_);
|
||||||
std::destroy_at(&other.string_v);
|
std::destroy_at(&other.string_v.val_);
|
||||||
break;
|
break;
|
||||||
case Type::List:
|
case Type::List:
|
||||||
list_v = std::move(other.list_v);
|
list_v.val_ = std::move(other.list_v.val_);
|
||||||
std::destroy_at(&other.list_v);
|
std::destroy_at(&other.list_v.val_);
|
||||||
break;
|
break;
|
||||||
case Type::Map:
|
case Type::Map:
|
||||||
map_v = std::move(other.map_v);
|
map_v.val_ = std::move(other.map_v.val_);
|
||||||
std::destroy_at(&other.map_v);
|
std::destroy_at(&other.map_v.val_);
|
||||||
break;
|
break;
|
||||||
case Type::TemporalData:
|
case Type::TemporalData:
|
||||||
temporal_data_v = other.temporal_data_v;
|
temporal_data_v.val_ = other.temporal_data_v.val_;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
other.type_ = Type::Null;
|
other.type_ = Type::Null;
|
||||||
@ -482,13 +490,13 @@ inline void PropertyValue::DestroyValue() noexcept {
|
|||||||
|
|
||||||
// destructor for non primitive types since we used placement new
|
// destructor for non primitive types since we used placement new
|
||||||
case Type::String:
|
case Type::String:
|
||||||
std::destroy_at(&string_v);
|
std::destroy_at(&string_v.val_);
|
||||||
return;
|
return;
|
||||||
case Type::List:
|
case Type::List:
|
||||||
std::destroy_at(&list_v);
|
std::destroy_at(&list_v.val_);
|
||||||
return;
|
return;
|
||||||
case Type::Map:
|
case Type::Map:
|
||||||
std::destroy_at(&map_v);
|
std::destroy_at(&map_v.val_);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
// Copyright 2023 Memgraph Ltd.
|
// Copyright 2024 Memgraph Ltd.
|
||||||
//
|
//
|
||||||
// Use of this software is governed by the Business Source License
|
// Use of this software is governed by the Business Source License
|
||||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||||
@ -13,7 +13,6 @@
|
|||||||
|
|
||||||
#include <atomic>
|
#include <atomic>
|
||||||
#include <limits>
|
#include <limits>
|
||||||
#include <list>
|
|
||||||
#include <memory>
|
#include <memory>
|
||||||
|
|
||||||
#include "utils/memory.hpp"
|
#include "utils/memory.hpp"
|
||||||
@ -39,7 +38,6 @@ namespace memgraph::storage {
|
|||||||
|
|
||||||
const uint64_t kTimestampInitialId = 0;
|
const uint64_t kTimestampInitialId = 0;
|
||||||
const uint64_t kTransactionInitialId = 1ULL << 63U;
|
const uint64_t kTransactionInitialId = 1ULL << 63U;
|
||||||
using PmrListDelta = utils::pmr::list<Delta>;
|
|
||||||
|
|
||||||
struct Transaction {
|
struct Transaction {
|
||||||
Transaction(uint64_t transaction_id, uint64_t start_timestamp, IsolationLevel isolation_level,
|
Transaction(uint64_t transaction_id, uint64_t start_timestamp, IsolationLevel isolation_level,
|
||||||
@ -47,7 +45,6 @@ struct Transaction {
|
|||||||
: transaction_id(transaction_id),
|
: transaction_id(transaction_id),
|
||||||
start_timestamp(start_timestamp),
|
start_timestamp(start_timestamp),
|
||||||
command_id(0),
|
command_id(0),
|
||||||
deltas(0),
|
|
||||||
md_deltas(utils::NewDeleteResource()),
|
md_deltas(utils::NewDeleteResource()),
|
||||||
must_abort(false),
|
must_abort(false),
|
||||||
isolation_level(isolation_level),
|
isolation_level(isolation_level),
|
||||||
@ -91,7 +88,7 @@ struct Transaction {
|
|||||||
std::unique_ptr<std::atomic<uint64_t>> commit_timestamp{};
|
std::unique_ptr<std::atomic<uint64_t>> commit_timestamp{};
|
||||||
uint64_t command_id{};
|
uint64_t command_id{};
|
||||||
|
|
||||||
Bond<PmrListDelta> deltas;
|
std::deque<Delta> deltas;
|
||||||
utils::pmr::list<MetadataDelta> md_deltas;
|
utils::pmr::list<MetadataDelta> md_deltas;
|
||||||
bool must_abort{};
|
bool must_abort{};
|
||||||
IsolationLevel isolation_level{};
|
IsolationLevel isolation_level{};
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
// Copyright 2023 Memgraph Ltd.
|
// Copyright 2024 Memgraph Ltd.
|
||||||
//
|
//
|
||||||
// Use of this software is governed by the Business Source License
|
// Use of this software is governed by the Business Source License
|
||||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||||
@ -76,13 +76,13 @@ inline auto HasLabel_ActionMethod(bool &has_label, LabelId label) {
|
|||||||
// clang-format off
|
// clang-format off
|
||||||
return utils::Overloaded{
|
return utils::Overloaded{
|
||||||
ActionMethod<REMOVE_LABEL>([&, label](Delta const &delta) {
|
ActionMethod<REMOVE_LABEL>([&, label](Delta const &delta) {
|
||||||
if (delta.label == label) {
|
if (delta.label.value == label) {
|
||||||
MG_ASSERT(has_label, "Invalid database state!");
|
MG_ASSERT(has_label, "Invalid database state!");
|
||||||
has_label = false;
|
has_label = false;
|
||||||
}
|
}
|
||||||
}),
|
}),
|
||||||
ActionMethod<ADD_LABEL>([&, label](Delta const &delta) {
|
ActionMethod<ADD_LABEL>([&, label](Delta const &delta) {
|
||||||
if (delta.label == label) {
|
if (delta.label.value == label) {
|
||||||
MG_ASSERT(!has_label, "Invalid database state!");
|
MG_ASSERT(!has_label, "Invalid database state!");
|
||||||
has_label = true;
|
has_label = true;
|
||||||
}
|
}
|
||||||
@ -96,14 +96,14 @@ inline auto Labels_ActionMethod(std::vector<LabelId> &labels) {
|
|||||||
// clang-format off
|
// clang-format off
|
||||||
return utils::Overloaded{
|
return utils::Overloaded{
|
||||||
ActionMethod<REMOVE_LABEL>([&](Delta const &delta) {
|
ActionMethod<REMOVE_LABEL>([&](Delta const &delta) {
|
||||||
auto it = std::find(labels.begin(), labels.end(), delta.label);
|
auto it = std::find(labels.begin(), labels.end(), delta.label.value);
|
||||||
DMG_ASSERT(it != labels.end(), "Invalid database state!");
|
DMG_ASSERT(it != labels.end(), "Invalid database state!");
|
||||||
*it = labels.back();
|
*it = labels.back();
|
||||||
labels.pop_back();
|
labels.pop_back();
|
||||||
}),
|
}),
|
||||||
ActionMethod<ADD_LABEL>([&](Delta const &delta) {
|
ActionMethod<ADD_LABEL>([&](Delta const &delta) {
|
||||||
DMG_ASSERT(std::find(labels.begin(), labels.end(), delta.label) == labels.end(), "Invalid database state!");
|
DMG_ASSERT(std::find(labels.begin(), labels.end(), delta.label.value) == labels.end(), "Invalid database state!");
|
||||||
labels.emplace_back(delta.label);
|
labels.emplace_back(delta.label.value);
|
||||||
})
|
})
|
||||||
};
|
};
|
||||||
// clang-format on
|
// clang-format on
|
||||||
@ -113,7 +113,7 @@ inline auto PropertyValue_ActionMethod(PropertyValue &value, PropertyId property
|
|||||||
using enum Delta::Action;
|
using enum Delta::Action;
|
||||||
return ActionMethod<SET_PROPERTY>([&, property](Delta const &delta) {
|
return ActionMethod<SET_PROPERTY>([&, property](Delta const &delta) {
|
||||||
if (delta.property.key == property) {
|
if (delta.property.key == property) {
|
||||||
value = delta.property.value;
|
value = *delta.property.value;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@ -121,7 +121,7 @@ inline auto PropertyValue_ActionMethod(PropertyValue &value, PropertyId property
|
|||||||
inline auto PropertyValueMatch_ActionMethod(bool &match, PropertyId property, PropertyValue const &value) {
|
inline auto PropertyValueMatch_ActionMethod(bool &match, PropertyId property, PropertyValue const &value) {
|
||||||
using enum Delta::Action;
|
using enum Delta::Action;
|
||||||
return ActionMethod<SET_PROPERTY>([&, property](Delta const &delta) {
|
return ActionMethod<SET_PROPERTY>([&, property](Delta const &delta) {
|
||||||
if (delta.property.key == property) match = (value == delta.property.value);
|
if (delta.property.key == property) match = (value == *delta.property.value);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -130,15 +130,15 @@ inline auto Properties_ActionMethod(std::map<PropertyId, PropertyValue> &propert
|
|||||||
return ActionMethod<SET_PROPERTY>([&](Delta const &delta) {
|
return ActionMethod<SET_PROPERTY>([&](Delta const &delta) {
|
||||||
auto it = properties.find(delta.property.key);
|
auto it = properties.find(delta.property.key);
|
||||||
if (it != properties.end()) {
|
if (it != properties.end()) {
|
||||||
if (delta.property.value.IsNull()) {
|
if (delta.property.value->IsNull()) {
|
||||||
// remove the property
|
// remove the property
|
||||||
properties.erase(it);
|
properties.erase(it);
|
||||||
} else {
|
} else {
|
||||||
// set the value
|
// set the value
|
||||||
it->second = delta.property.value;
|
it->second = *delta.property.value;
|
||||||
}
|
}
|
||||||
} else if (!delta.property.value.IsNull()) {
|
} else if (!delta.property.value->IsNull()) {
|
||||||
properties.emplace(delta.property.key, delta.property.value);
|
properties.emplace(delta.property.key, *delta.property.value);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
// Copyright 2023 Memgraph Ltd.
|
// Copyright 2024 Memgraph Ltd.
|
||||||
//
|
//
|
||||||
// Use of this software is governed by the Business Source License
|
// Use of this software is governed by the Business Source License
|
||||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||||
@ -21,7 +21,7 @@ inline std::optional<std::string> GetOldDiskKeyOrNull(storage::Delta *head) {
|
|||||||
head = head->next;
|
head = head->next;
|
||||||
}
|
}
|
||||||
if (head->action == storage::Delta::Action::DELETE_DESERIALIZED_OBJECT) {
|
if (head->action == storage::Delta::Action::DELETE_DESERIALIZED_OBJECT) {
|
||||||
return head->old_disk_key;
|
return head->old_disk_key.value;
|
||||||
}
|
}
|
||||||
return std::nullopt;
|
return std::nullopt;
|
||||||
}
|
}
|
||||||
|
@ -338,6 +338,13 @@ inline uint64_t ParseStringToUint64(const std::string_view s) {
|
|||||||
throw utils::ParseException(s);
|
throw utils::ParseException(s);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inline uint32_t ParseStringToUint32(const std::string_view s) {
|
||||||
|
if (uint32_t value = 0; std::from_chars(s.data(), s.data() + s.size(), value).ec == std::errc{}) {
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
throw utils::ParseException(s);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Parse a double floating point value from a string using classic locale.
|
* Parse a double floating point value from a string using classic locale.
|
||||||
* Note, the current implementation copies the given string which may perform a
|
* Note, the current implementation copies the given string which may perform a
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
// Copyright 2023 Memgraph Ltd.
|
// Copyright 2024 Memgraph Ltd.
|
||||||
//
|
//
|
||||||
// Use of this software is governed by the Business Source License
|
// Use of this software is governed by the Business Source License
|
||||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||||
@ -419,9 +419,9 @@ TEST(PropertyStore, IntEncoding) {
|
|||||||
{memgraph::storage::PropertyId::FromUint(1048576UL), memgraph::storage::PropertyValue(1048576L)},
|
{memgraph::storage::PropertyId::FromUint(1048576UL), memgraph::storage::PropertyValue(1048576L)},
|
||||||
{memgraph::storage::PropertyId::FromUint(std::numeric_limits<uint32_t>::max()),
|
{memgraph::storage::PropertyId::FromUint(std::numeric_limits<uint32_t>::max()),
|
||||||
memgraph::storage::PropertyValue(std::numeric_limits<int32_t>::max())},
|
memgraph::storage::PropertyValue(std::numeric_limits<int32_t>::max())},
|
||||||
{memgraph::storage::PropertyId::FromUint(4294967296UL), memgraph::storage::PropertyValue(4294967296L)},
|
{memgraph::storage::PropertyId::FromUint(1048577UL), memgraph::storage::PropertyValue(4294967296L)},
|
||||||
{memgraph::storage::PropertyId::FromUint(137438953472UL), memgraph::storage::PropertyValue(137438953472L)},
|
{memgraph::storage::PropertyId::FromUint(1048578UL), memgraph::storage::PropertyValue(137438953472L)},
|
||||||
{memgraph::storage::PropertyId::FromUint(std::numeric_limits<uint64_t>::max()),
|
{memgraph::storage::PropertyId::FromUint(std::numeric_limits<uint32_t>::max()),
|
||||||
memgraph::storage::PropertyValue(std::numeric_limits<int64_t>::max())}};
|
memgraph::storage::PropertyValue(std::numeric_limits<int64_t>::max())}};
|
||||||
|
|
||||||
memgraph::storage::PropertyStore props;
|
memgraph::storage::PropertyStore props;
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
// Copyright 2023 Memgraph Ltd.
|
// Copyright 2024 Memgraph Ltd.
|
||||||
//
|
//
|
||||||
// Use of this software is governed by the Business Source License
|
// Use of this software is governed by the Business Source License
|
||||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||||
@ -154,8 +154,8 @@ class DeltaGenerator final {
|
|||||||
|
|
||||||
void Finalize(bool append_transaction_end = true) {
|
void Finalize(bool append_transaction_end = true) {
|
||||||
auto commit_timestamp = gen_->timestamp_++;
|
auto commit_timestamp = gen_->timestamp_++;
|
||||||
if (transaction_.deltas.use().empty()) return;
|
if (transaction_.deltas.empty()) return;
|
||||||
for (const auto &delta : transaction_.deltas.use()) {
|
for (const auto &delta : transaction_.deltas) {
|
||||||
auto owner = delta.prev.Get();
|
auto owner = delta.prev.Get();
|
||||||
while (owner.type == memgraph::storage::PreviousPtr::Type::DELTA) {
|
while (owner.type == memgraph::storage::PreviousPtr::Type::DELTA) {
|
||||||
owner = owner.delta->prev.Get();
|
owner = owner.delta->prev.Get();
|
||||||
@ -171,7 +171,7 @@ class DeltaGenerator final {
|
|||||||
if (append_transaction_end) {
|
if (append_transaction_end) {
|
||||||
gen_->wal_file_.AppendTransactionEnd(commit_timestamp);
|
gen_->wal_file_.AppendTransactionEnd(commit_timestamp);
|
||||||
if (gen_->valid_) {
|
if (gen_->valid_) {
|
||||||
gen_->UpdateStats(commit_timestamp, transaction_.deltas.use().size() + 1);
|
gen_->UpdateStats(commit_timestamp, transaction_.deltas.size() + 1);
|
||||||
for (auto &data : data_) {
|
for (auto &data : data_) {
|
||||||
if (data.type == memgraph::storage::durability::WalDeltaData::Type::VERTEX_SET_PROPERTY) {
|
if (data.type == memgraph::storage::durability::WalDeltaData::Type::VERTEX_SET_PROPERTY) {
|
||||||
// We need to put the final property value into the SET_PROPERTY
|
// We need to put the final property value into the SET_PROPERTY
|
||||||
|
Loading…
Reference in New Issue
Block a user