Refactor storage delta in storage v2
Reviewers: mtomic, teon.banek Reviewed By: mtomic Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D2171
This commit is contained in:
parent
db72ef05f8
commit
0833a74fd8
@ -15,41 +15,104 @@ struct Delta {
|
||||
SET_PROPERTY,
|
||||
};
|
||||
|
||||
Delta(Action action, uint64_t key, const PropertyValue &value,
|
||||
std::atomic<uint64_t> *timestamp, uint64_t command_id)
|
||||
: action(action),
|
||||
key(key),
|
||||
value(value),
|
||||
struct DeleteObjectTag {};
|
||||
struct RecreateObjectTag {};
|
||||
struct AddLabelTag {};
|
||||
struct RemoveLabelTag {};
|
||||
struct SetPropertyTag {};
|
||||
|
||||
Delta(DeleteObjectTag, std::atomic<uint64_t> *timestamp, uint64_t command_id)
|
||||
: action(Action::DELETE_OBJECT),
|
||||
timestamp(timestamp),
|
||||
command_id(command_id) {}
|
||||
|
||||
Delta(RecreateObjectTag, std::atomic<uint64_t> *timestamp,
|
||||
uint64_t command_id)
|
||||
: action(Action::RECREATE_OBJECT),
|
||||
timestamp(timestamp),
|
||||
command_id(command_id) {}
|
||||
|
||||
Delta(AddLabelTag, uint64_t label, std::atomic<uint64_t> *timestamp,
|
||||
uint64_t command_id)
|
||||
: action(Action::ADD_LABEL),
|
||||
timestamp(timestamp),
|
||||
command_id(command_id),
|
||||
prev(nullptr),
|
||||
next(nullptr) {}
|
||||
label(label) {}
|
||||
|
||||
Delta(RemoveLabelTag, uint64_t label, std::atomic<uint64_t> *timestamp,
|
||||
uint64_t command_id)
|
||||
: action(Action::REMOVE_LABEL),
|
||||
timestamp(timestamp),
|
||||
command_id(command_id),
|
||||
label(label) {}
|
||||
|
||||
Delta(SetPropertyTag, uint64_t key, const PropertyValue &value,
|
||||
std::atomic<uint64_t> *timestamp, uint64_t command_id)
|
||||
: action(Action::SET_PROPERTY),
|
||||
timestamp(timestamp),
|
||||
command_id(command_id),
|
||||
property({key, value}) {}
|
||||
|
||||
Delta(Delta &&other) noexcept
|
||||
: action(other.action),
|
||||
key(other.key),
|
||||
value(std::move(other.value)),
|
||||
timestamp(other.timestamp),
|
||||
command_id(other.command_id),
|
||||
prev(other.prev),
|
||||
next(other.next.load()) {}
|
||||
next(other.next.load()) {
|
||||
switch (other.action) {
|
||||
case Action::DELETE_OBJECT:
|
||||
case Action::RECREATE_OBJECT:
|
||||
break;
|
||||
case Action::ADD_LABEL:
|
||||
case Action::REMOVE_LABEL:
|
||||
label = other.label;
|
||||
break;
|
||||
case Action::SET_PROPERTY:
|
||||
property.key = other.property.key;
|
||||
new (&property.value) PropertyValue(std::move(other.property.value));
|
||||
break;
|
||||
}
|
||||
|
||||
// reset the action of other
|
||||
other.DestroyValue();
|
||||
other.action = Action::DELETE_OBJECT;
|
||||
}
|
||||
|
||||
Delta(const Delta &) = delete;
|
||||
Delta &operator=(const Delta &) = delete;
|
||||
Delta &operator=(Delta &&other) = delete;
|
||||
|
||||
~Delta() {}
|
||||
~Delta() { DestroyValue(); }
|
||||
|
||||
Action action;
|
||||
|
||||
uint64_t key; // Used as the label id or the property id.
|
||||
PropertyValue value; // Used as the property value (only for SET_PROPERTY).
|
||||
|
||||
// TODO: optimize with in-place copy
|
||||
std::atomic<uint64_t> *timestamp;
|
||||
uint64_t command_id;
|
||||
Delta *prev;
|
||||
std::atomic<Delta *> next;
|
||||
Delta *prev{nullptr};
|
||||
std::atomic<Delta *> next{nullptr};
|
||||
|
||||
union {
|
||||
uint64_t label;
|
||||
struct {
|
||||
uint64_t key;
|
||||
storage::PropertyValue value;
|
||||
} property;
|
||||
};
|
||||
|
||||
private:
|
||||
void DestroyValue() {
|
||||
switch (action) {
|
||||
case Action::DELETE_OBJECT:
|
||||
case Action::RECREATE_OBJECT:
|
||||
case Action::ADD_LABEL:
|
||||
case Action::REMOVE_LABEL:
|
||||
break;
|
||||
case Action::SET_PROPERTY:
|
||||
property.value.~PropertyValue();
|
||||
break;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace storage
|
||||
|
@ -64,12 +64,12 @@ inline bool PrepareForWrite(Transaction *transaction, Vertex *vertex) {
|
||||
return false;
|
||||
}
|
||||
|
||||
/// This function creates a delta in the transaction and returns a pointer to
|
||||
/// the created delta. It doesn't perform any linking of the delta and is
|
||||
/// primarily used to create the first delta for an object.
|
||||
inline Delta *CreateDelta(Transaction *transaction, Delta::Action action,
|
||||
uint64_t key) {
|
||||
return &transaction->deltas.emplace_back(action, key, PropertyValue(),
|
||||
/// This function creates a `DELETE_OBJECT` delta in the transaction and returns
|
||||
/// a pointer to the created delta. It doesn't perform any linking of the delta
|
||||
/// and is primarily used to create the first delta for an object (that must be
|
||||
/// a `DELETE_OBJECT` delta).
|
||||
inline Delta *CreateDeleteObjectDelta(Transaction *transaction) {
|
||||
return &transaction->deltas.emplace_back(Delta::DeleteObjectTag(),
|
||||
&transaction->commit_timestamp,
|
||||
transaction->command_id);
|
||||
}
|
||||
@ -77,10 +77,10 @@ inline Delta *CreateDelta(Transaction *transaction, Delta::Action action,
|
||||
/// This function creates a delta in the transaction for the Vertex object and
|
||||
/// links the delta into the Vertex's delta list. It also adds the Vertex to the
|
||||
/// transaction's modified vertices list.
|
||||
template <class... Args>
|
||||
inline void CreateAndLinkDelta(Transaction *transaction, Vertex *vertex,
|
||||
Delta::Action action, uint64_t key,
|
||||
const PropertyValue &value = PropertyValue()) {
|
||||
auto delta = &transaction->deltas.emplace_back(action, key, value,
|
||||
Args &&... args) {
|
||||
auto delta = &transaction->deltas.emplace_back(std::forward<Args>(args)...,
|
||||
&transaction->commit_timestamp,
|
||||
transaction->command_id);
|
||||
|
||||
|
@ -81,7 +81,7 @@ class Storage final {
|
||||
VertexAccessor CreateVertex() {
|
||||
auto gid = storage_->vertex_id_.fetch_add(1, std::memory_order_acq_rel);
|
||||
auto acc = storage_->vertices_.access();
|
||||
auto delta = CreateDelta(transaction_, Delta::Action::DELETE_OBJECT, 0);
|
||||
auto delta = CreateDeleteObjectDelta(transaction_);
|
||||
auto [it, inserted] =
|
||||
acc.insert(Vertex{storage::Gid::FromUint(gid), delta});
|
||||
CHECK(inserted) << "The vertex must be inserted here!";
|
||||
@ -128,7 +128,7 @@ class Storage final {
|
||||
switch (current->action) {
|
||||
case Delta::Action::REMOVE_LABEL: {
|
||||
auto it = std::find(vertex->labels.begin(), vertex->labels.end(),
|
||||
current->key);
|
||||
current->label);
|
||||
CHECK(it != vertex->labels.end()) << "Invalid database state!";
|
||||
std::swap(*it, *vertex->labels.rbegin());
|
||||
vertex->labels.pop_back();
|
||||
@ -136,23 +136,24 @@ class Storage final {
|
||||
}
|
||||
case Delta::Action::ADD_LABEL: {
|
||||
auto it = std::find(vertex->labels.begin(), vertex->labels.end(),
|
||||
current->key);
|
||||
current->label);
|
||||
CHECK(it == vertex->labels.end()) << "Invalid database state!";
|
||||
vertex->labels.push_back(current->key);
|
||||
vertex->labels.push_back(current->label);
|
||||
break;
|
||||
}
|
||||
case Delta::Action::SET_PROPERTY: {
|
||||
auto it = vertex->properties.find(current->key);
|
||||
auto it = vertex->properties.find(current->property.key);
|
||||
if (it != vertex->properties.end()) {
|
||||
if (current->value.IsNull()) {
|
||||
if (current->property.value.IsNull()) {
|
||||
// remove the property
|
||||
vertex->properties.erase(it);
|
||||
} else {
|
||||
// set the value
|
||||
it->second = current->value;
|
||||
it->second = current->property.value;
|
||||
}
|
||||
} else if (!current->value.IsNull()) {
|
||||
vertex->properties.emplace(current->key, current->value);
|
||||
} else if (!current->property.value.IsNull()) {
|
||||
vertex->properties.emplace(current->property.key,
|
||||
current->property.value);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
@ -57,8 +57,7 @@ class VertexAccessor final {
|
||||
|
||||
if (vertex_->deleted) return Result<bool>{false};
|
||||
|
||||
CreateAndLinkDelta(transaction_, vertex_, Delta::Action::RECREATE_OBJECT,
|
||||
0);
|
||||
CreateAndLinkDelta(transaction_, vertex_, Delta::RecreateObjectTag());
|
||||
|
||||
vertex_->deleted = true;
|
||||
|
||||
@ -77,8 +76,7 @@ class VertexAccessor final {
|
||||
vertex_->labels.end())
|
||||
return Result<bool>{false};
|
||||
|
||||
CreateAndLinkDelta(transaction_, vertex_, Delta::Action::REMOVE_LABEL,
|
||||
label);
|
||||
CreateAndLinkDelta(transaction_, vertex_, Delta::RemoveLabelTag(), label);
|
||||
|
||||
vertex_->labels.push_back(label);
|
||||
return Result<bool>{true};
|
||||
@ -95,7 +93,7 @@ class VertexAccessor final {
|
||||
auto it = std::find(vertex_->labels.begin(), vertex_->labels.end(), label);
|
||||
if (it == vertex_->labels.end()) return Result<bool>{false};
|
||||
|
||||
CreateAndLinkDelta(transaction_, vertex_, Delta::Action::ADD_LABEL, label);
|
||||
CreateAndLinkDelta(transaction_, vertex_, Delta::AddLabelTag(), label);
|
||||
|
||||
std::swap(*it, *vertex_->labels.rbegin());
|
||||
vertex_->labels.pop_back();
|
||||
@ -117,14 +115,14 @@ class VertexAccessor final {
|
||||
[&deleted, &has_label, label](const Delta &delta) {
|
||||
switch (delta.action) {
|
||||
case Delta::Action::REMOVE_LABEL: {
|
||||
if (delta.key == label) {
|
||||
if (delta.label == label) {
|
||||
CHECK(has_label) << "Invalid database state!";
|
||||
has_label = false;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case Delta::Action::ADD_LABEL: {
|
||||
if (delta.key == label) {
|
||||
if (delta.label == label) {
|
||||
CHECK(!has_label) << "Invalid database state!";
|
||||
has_label = true;
|
||||
}
|
||||
@ -161,7 +159,7 @@ class VertexAccessor final {
|
||||
switch (delta.action) {
|
||||
case Delta::Action::REMOVE_LABEL: {
|
||||
// Remove the label because we don't see the addition.
|
||||
auto it = std::find(labels.begin(), labels.end(), delta.key);
|
||||
auto it = std::find(labels.begin(), labels.end(), delta.label);
|
||||
CHECK(it != labels.end()) << "Invalid database state!";
|
||||
std::swap(*it, *labels.rbegin());
|
||||
labels.pop_back();
|
||||
@ -169,9 +167,9 @@ class VertexAccessor final {
|
||||
}
|
||||
case Delta::Action::ADD_LABEL: {
|
||||
// Add the label because we don't see the removal.
|
||||
auto it = std::find(labels.begin(), labels.end(), delta.key);
|
||||
auto it = std::find(labels.begin(), labels.end(), delta.label);
|
||||
CHECK(it == labels.end()) << "Invalid database state!";
|
||||
labels.push_back(delta.key);
|
||||
labels.push_back(delta.label);
|
||||
break;
|
||||
}
|
||||
case Delta::Action::DELETE_OBJECT: {
|
||||
@ -201,7 +199,7 @@ class VertexAccessor final {
|
||||
auto it = vertex_->properties.find(property);
|
||||
bool existed = it != vertex_->properties.end();
|
||||
if (it != vertex_->properties.end()) {
|
||||
CreateAndLinkDelta(transaction_, vertex_, Delta::Action::SET_PROPERTY,
|
||||
CreateAndLinkDelta(transaction_, vertex_, Delta::SetPropertyTag(),
|
||||
property, it->second);
|
||||
if (value.IsNull()) {
|
||||
// remove the property
|
||||
@ -211,7 +209,7 @@ class VertexAccessor final {
|
||||
it->second = value;
|
||||
}
|
||||
} else {
|
||||
CreateAndLinkDelta(transaction_, vertex_, Delta::Action::SET_PROPERTY,
|
||||
CreateAndLinkDelta(transaction_, vertex_, Delta::SetPropertyTag(),
|
||||
property, PropertyValue());
|
||||
if (!value.IsNull()) {
|
||||
vertex_->properties.emplace(property, value);
|
||||
@ -238,8 +236,8 @@ class VertexAccessor final {
|
||||
[&deleted, &value, property](const Delta &delta) {
|
||||
switch (delta.action) {
|
||||
case Delta::Action::SET_PROPERTY: {
|
||||
if (delta.key == property) {
|
||||
value = delta.value;
|
||||
if (delta.property.key == property) {
|
||||
value = delta.property.value;
|
||||
}
|
||||
break;
|
||||
}
|
||||
@ -270,37 +268,37 @@ class VertexAccessor final {
|
||||
properties = vertex_->properties;
|
||||
delta = vertex_->delta;
|
||||
}
|
||||
ApplyDeltasForRead(transaction_, delta, view,
|
||||
[&deleted, &properties](const Delta &delta) {
|
||||
switch (delta.action) {
|
||||
case Delta::Action::SET_PROPERTY: {
|
||||
auto it = properties.find(delta.key);
|
||||
if (it != properties.end()) {
|
||||
if (delta.value.IsNull()) {
|
||||
// remove the property
|
||||
properties.erase(it);
|
||||
} else {
|
||||
// set the value
|
||||
it->second = delta.value;
|
||||
}
|
||||
} else if (!delta.value.IsNull()) {
|
||||
properties.emplace(delta.key, delta.value);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case Delta::Action::DELETE_OBJECT: {
|
||||
LOG(FATAL) << "Invalid accessor!";
|
||||
break;
|
||||
}
|
||||
case Delta::Action::RECREATE_OBJECT: {
|
||||
deleted = false;
|
||||
break;
|
||||
}
|
||||
case Delta::Action::ADD_LABEL:
|
||||
case Delta::Action::REMOVE_LABEL:
|
||||
break;
|
||||
}
|
||||
});
|
||||
ApplyDeltasForRead(
|
||||
transaction_, delta, view, [&deleted, &properties](const Delta &delta) {
|
||||
switch (delta.action) {
|
||||
case Delta::Action::SET_PROPERTY: {
|
||||
auto it = properties.find(delta.property.key);
|
||||
if (it != properties.end()) {
|
||||
if (delta.property.value.IsNull()) {
|
||||
// remove the property
|
||||
properties.erase(it);
|
||||
} else {
|
||||
// set the value
|
||||
it->second = delta.property.value;
|
||||
}
|
||||
} else if (!delta.property.value.IsNull()) {
|
||||
properties.emplace(delta.property.key, delta.property.value);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case Delta::Action::DELETE_OBJECT: {
|
||||
LOG(FATAL) << "Invalid accessor!";
|
||||
break;
|
||||
}
|
||||
case Delta::Action::RECREATE_OBJECT: {
|
||||
deleted = false;
|
||||
break;
|
||||
}
|
||||
case Delta::Action::ADD_LABEL:
|
||||
case Delta::Action::REMOVE_LABEL:
|
||||
break;
|
||||
}
|
||||
});
|
||||
if (deleted) {
|
||||
return Result<std::unordered_map<uint64_t, PropertyValue>>{
|
||||
Error::DELETED_OBJECT};
|
||||
|
Loading…
Reference in New Issue
Block a user