Split storage and vertex accessor into cpp in storage v2

Reviewers: mtomic, teon.banek

Reviewed By: mtomic

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D2172
This commit is contained in:
Matej Ferencevic 2019-07-03 15:32:03 +02:00
parent 9e223cdabd
commit 6e6dff81e0
7 changed files with 480 additions and 419 deletions

View File

@ -11,6 +11,7 @@ add_subdirectory(communication)
add_subdirectory(stats) add_subdirectory(stats)
add_subdirectory(auth) add_subdirectory(auth)
add_subdirectory(slk) add_subdirectory(slk)
add_subdirectory(storage/v2)
# ---------------------------------------------------------------------------- # ----------------------------------------------------------------------------
# Common LCP files # Common LCP files

View File

@ -0,0 +1,6 @@
set(storage_v2_src_files
vertex_accessor.cpp
storage.cpp)
add_library(mg-storage-v2 STATIC ${storage_v2_src_files})
target_link_libraries(mg-storage-v2 Threads::Threads mg-utils glog gflags)

153
src/storage/v2/storage.cpp Normal file
View File

@ -0,0 +1,153 @@
#include "storage/v2/storage.hpp"
#include <memory>
#include <glog/logging.h>
#include "storage/v2/mvcc.hpp"
namespace storage {
Storage::Accessor::Accessor(Storage *storage)
: storage_(storage), is_transaction_starter_(true) {
// We acquire the storage lock here because we access (and modify) the
// transaction engine variables (`transaction_id` and `timestamp`) below.
std::lock_guard<utils::SpinLock> guard(storage_->lock_);
auto acc = storage_->transactions_.access();
auto [it, inserted] = acc.insert(
Transaction{storage_->transaction_id_++, storage_->timestamp_++});
CHECK(inserted) << "The Transaction must be inserted here!";
CHECK(it != acc.end()) << "Invalid Transaction iterator!";
transaction_ = &*it;
}
Storage::Accessor::Accessor(Accessor &&other) noexcept
: storage_(other.storage_),
transaction_(other.transaction_),
is_transaction_starter_(true) {
CHECK(other.is_transaction_starter_) << "The original accessor isn't valid!";
// Don't allow the other accessor to abort our transaction.
other.is_transaction_starter_ = false;
}
// This operator isn't `noexcept` because the `Abort` function isn't
// `noexcept`.
Storage::Accessor &Storage::Accessor::operator=(Accessor &&other) {
if (this == &other) return *this;
if (is_transaction_starter_ && transaction_->is_active) {
Abort();
}
storage_ = other.storage_;
transaction_ = other.transaction_;
is_transaction_starter_ = true;
CHECK(other.is_transaction_starter_) << "The original accessor isn't valid!";
// Don't allow the other accessor to abort our transaction.
other.is_transaction_starter_ = false;
return *this;
}
Storage::Accessor::~Accessor() {
if (is_transaction_starter_ && transaction_->is_active) {
Abort();
}
}
VertexAccessor Storage::Accessor::CreateVertex() {
auto gid = storage_->vertex_id_.fetch_add(1, std::memory_order_acq_rel);
auto acc = storage_->vertices_.access();
auto delta = CreateDeleteObjectDelta(transaction_);
auto [it, inserted] = acc.insert(Vertex{storage::Gid::FromUint(gid), delta});
CHECK(inserted) << "The vertex must be inserted here!";
CHECK(it != acc.end()) << "Invalid Vertex accessor!";
transaction_->modified_vertices.push_back(&*it);
return VertexAccessor::Create(&*it, transaction_, View::NEW).value();
}
std::optional<VertexAccessor> Storage::Accessor::FindVertex(Gid gid,
View view) {
auto acc = storage_->vertices_.access();
auto it = acc.find(gid);
if (it == acc.end()) return std::nullopt;
return VertexAccessor::Create(&*it, transaction_, view);
}
void Storage::Accessor::AdvanceCommand() { ++transaction_->command_id; }
void Storage::Accessor::Commit() {
CHECK(!transaction_->must_abort) << "The transaction can't be committed!";
CHECK(transaction_->is_active) << "The transaction is already terminated!";
if (transaction_->deltas.empty()) {
transaction_->commit_timestamp.store(transaction_->start_timestamp,
std::memory_order_release);
} else {
std::lock_guard<utils::SpinLock> guard(storage_->lock_);
transaction_->commit_timestamp.store(storage_->timestamp_++,
std::memory_order_release);
// TODO: release lock, and update all deltas to have an in-memory copy
// of the commit id
}
transaction_->is_active = false;
}
void Storage::Accessor::Abort() {
CHECK(transaction_->is_active) << "The transaction is already terminated!";
for (Vertex *vertex : transaction_->modified_vertices) {
std::lock_guard<utils::SpinLock> guard(vertex->lock);
Delta *current = vertex->delta;
while (current != nullptr &&
current->timestamp->load(std::memory_order_acquire) ==
transaction_->transaction_id) {
switch (current->action) {
case Delta::Action::REMOVE_LABEL: {
auto it = std::find(vertex->labels.begin(), vertex->labels.end(),
current->label);
CHECK(it != vertex->labels.end()) << "Invalid database state!";
std::swap(*it, *vertex->labels.rbegin());
vertex->labels.pop_back();
break;
}
case Delta::Action::ADD_LABEL: {
auto it = std::find(vertex->labels.begin(), vertex->labels.end(),
current->label);
CHECK(it == vertex->labels.end()) << "Invalid database state!";
vertex->labels.push_back(current->label);
break;
}
case Delta::Action::SET_PROPERTY: {
auto it = vertex->properties.find(current->property.key);
if (it != vertex->properties.end()) {
if (current->property.value.IsNull()) {
// remove the property
vertex->properties.erase(it);
} else {
// set the value
it->second = current->property.value;
}
} else if (!current->property.value.IsNull()) {
vertex->properties.emplace(current->property.key,
current->property.value);
}
break;
}
case Delta::Action::DELETE_OBJECT: {
auto acc = storage_->vertices_.access();
CHECK(acc.remove(vertex->gid)) << "Invalid database state!";
break;
}
case Delta::Action::RECREATE_OBJECT: {
vertex->deleted = false;
break;
}
}
current = current->next.load(std::memory_order_acquire);
}
vertex->delta = current;
}
transaction_->is_active = false;
}
} // namespace storage

View File

@ -1,10 +1,7 @@
#pragma once #pragma once
#include <memory>
#include <optional> #include <optional>
#include <glog/logging.h>
#include "utils/skip_list.hpp" #include "utils/skip_list.hpp"
#include "storage/v2/transaction.hpp" #include "storage/v2/transaction.hpp"
@ -25,154 +22,28 @@ class Storage final {
public: public:
class Accessor final { class Accessor final {
public: public:
explicit Accessor(Storage *storage) explicit Accessor(Storage *storage);
: storage_(storage), is_transaction_starter_(true) {
// We acquire the storage lock here because we access (and modify) the
// transaction engine variables (`transaction_id` and `timestamp`) below.
std::lock_guard<utils::SpinLock> guard(storage_->lock_);
auto acc = storage_->transactions_.access();
auto [it, inserted] = acc.insert(
Transaction{storage_->transaction_id_++, storage_->timestamp_++});
CHECK(inserted) << "The Transaction must be inserted here!";
CHECK(it != acc.end()) << "Invalid Transaction iterator!";
transaction_ = &*it;
}
Accessor(const Accessor &) = delete; Accessor(const Accessor &) = delete;
Accessor &operator=(const Accessor &) = delete; Accessor &operator=(const Accessor &) = delete;
Accessor(Accessor &&other) noexcept Accessor(Accessor &&other) noexcept;
: storage_(other.storage_),
transaction_(other.transaction_),
is_transaction_starter_(true) {
CHECK(other.is_transaction_starter_)
<< "The original accessor isn't valid!";
// Don't allow the other accessor to abort our transaction.
other.is_transaction_starter_ = false;
}
// This operator isn't `noexcept` because the `Abort` function isn't // This operator isn't `noexcept` because the `Abort` function isn't
// `noexcept`. // `noexcept`.
Accessor &operator=(Accessor &&other) { Accessor &operator=(Accessor &&other);
if (this == &other) return *this;
if (is_transaction_starter_ && transaction_->is_active) { ~Accessor();
Abort();
}
storage_ = other.storage_; VertexAccessor CreateVertex();
transaction_ = other.transaction_;
is_transaction_starter_ = true;
CHECK(other.is_transaction_starter_) std::optional<VertexAccessor> FindVertex(Gid gid, View view);
<< "The original accessor isn't valid!";
// Don't allow the other accessor to abort our transaction.
other.is_transaction_starter_ = false;
return *this; void AdvanceCommand();
}
~Accessor() { void Commit();
if (is_transaction_starter_ && transaction_->is_active) {
Abort();
}
}
VertexAccessor CreateVertex() { void Abort();
auto gid = storage_->vertex_id_.fetch_add(1, std::memory_order_acq_rel);
auto acc = storage_->vertices_.access();
auto delta = CreateDeleteObjectDelta(transaction_);
auto [it, inserted] =
acc.insert(Vertex{storage::Gid::FromUint(gid), delta});
CHECK(inserted) << "The vertex must be inserted here!";
CHECK(it != acc.end()) << "Invalid Vertex accessor!";
transaction_->modified_vertices.push_back(&*it);
return VertexAccessor::Create(&*it, transaction_, View::NEW).value();
}
std::optional<VertexAccessor> FindVertex(Gid gid, View view) {
auto acc = storage_->vertices_.access();
auto it = acc.find(gid);
if (it == acc.end()) return std::nullopt;
return VertexAccessor::Create(&*it, transaction_, view);
}
void AdvanceCommand() { ++transaction_->command_id; }
void Commit() {
CHECK(!transaction_->must_abort) << "The transaction can't be committed!";
CHECK(transaction_->is_active)
<< "The transaction is already terminated!";
if (transaction_->deltas.empty()) {
transaction_->commit_timestamp.store(transaction_->start_timestamp,
std::memory_order_release);
} else {
std::lock_guard<utils::SpinLock> guard(storage_->lock_);
transaction_->commit_timestamp.store(storage_->timestamp_++,
std::memory_order_release);
// TODO: release lock, and update all deltas to have an in-memory copy
// of the commit id
}
transaction_->is_active = false;
}
void Abort() {
CHECK(transaction_->is_active)
<< "The transaction is already terminated!";
for (Vertex *vertex : transaction_->modified_vertices) {
std::lock_guard<utils::SpinLock> guard(vertex->lock);
Delta *current = vertex->delta;
while (current != nullptr &&
current->timestamp->load(std::memory_order_acquire) ==
transaction_->transaction_id) {
switch (current->action) {
case Delta::Action::REMOVE_LABEL: {
auto it = std::find(vertex->labels.begin(), vertex->labels.end(),
current->label);
CHECK(it != vertex->labels.end()) << "Invalid database state!";
std::swap(*it, *vertex->labels.rbegin());
vertex->labels.pop_back();
break;
}
case Delta::Action::ADD_LABEL: {
auto it = std::find(vertex->labels.begin(), vertex->labels.end(),
current->label);
CHECK(it == vertex->labels.end()) << "Invalid database state!";
vertex->labels.push_back(current->label);
break;
}
case Delta::Action::SET_PROPERTY: {
auto it = vertex->properties.find(current->property.key);
if (it != vertex->properties.end()) {
if (current->property.value.IsNull()) {
// remove the property
vertex->properties.erase(it);
} else {
// set the value
it->second = current->property.value;
}
} else if (!current->property.value.IsNull()) {
vertex->properties.emplace(current->property.key,
current->property.value);
}
break;
}
case Delta::Action::DELETE_OBJECT: {
auto acc = storage_->vertices_.access();
CHECK(acc.remove(vertex->gid)) << "Invalid database state!";
break;
}
case Delta::Action::RECREATE_OBJECT: {
vertex->deleted = false;
break;
}
}
current = current->next.load(std::memory_order_acquire);
}
vertex->delta = current;
}
transaction_->is_active = false;
}
private: private:
Storage *storage_; Storage *storage_;

View File

@ -0,0 +1,301 @@
#include "storage/v2/vertex_accessor.hpp"
#include <memory>
#include "storage/v2/mvcc.hpp"
namespace storage {
std::optional<VertexAccessor> VertexAccessor::Create(Vertex *vertex,
Transaction *transaction,
View view) {
bool is_visible = true;
Delta *delta = nullptr;
{
std::lock_guard<utils::SpinLock> guard(vertex->lock);
is_visible = !vertex->deleted;
delta = vertex->delta;
}
ApplyDeltasForRead(transaction, delta, view,
[&is_visible](const Delta &delta) {
switch (delta.action) {
case Delta::Action::ADD_LABEL:
case Delta::Action::REMOVE_LABEL:
case Delta::Action::SET_PROPERTY:
break;
case Delta::Action::RECREATE_OBJECT: {
is_visible = true;
break;
}
case Delta::Action::DELETE_OBJECT: {
is_visible = false;
break;
}
}
});
if (!is_visible) return std::nullopt;
return VertexAccessor{vertex, transaction};
}
Result<bool> VertexAccessor::Delete() {
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
if (!PrepareForWrite(transaction_, vertex_))
return Result<bool>{Error::SERIALIZATION_ERROR};
if (vertex_->deleted) return Result<bool>{false};
CreateAndLinkDelta(transaction_, vertex_, Delta::RecreateObjectTag());
vertex_->deleted = true;
return Result<bool>{true};
}
Result<bool> VertexAccessor::AddLabel(uint64_t label) {
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
if (!PrepareForWrite(transaction_, vertex_))
return Result<bool>{Error::SERIALIZATION_ERROR};
if (vertex_->deleted) return Result<bool>{Error::DELETED_OBJECT};
if (std::find(vertex_->labels.begin(), vertex_->labels.end(), label) !=
vertex_->labels.end())
return Result<bool>{false};
CreateAndLinkDelta(transaction_, vertex_, Delta::RemoveLabelTag(), label);
vertex_->labels.push_back(label);
return Result<bool>{true};
}
Result<bool> VertexAccessor::RemoveLabel(uint64_t label) {
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
if (!PrepareForWrite(transaction_, vertex_))
return Result<bool>{Error::SERIALIZATION_ERROR};
if (vertex_->deleted) return Result<bool>{Error::DELETED_OBJECT};
auto it = std::find(vertex_->labels.begin(), vertex_->labels.end(), label);
if (it == vertex_->labels.end()) return Result<bool>{false};
CreateAndLinkDelta(transaction_, vertex_, Delta::AddLabelTag(), label);
std::swap(*it, *vertex_->labels.rbegin());
vertex_->labels.pop_back();
return Result<bool>{true};
}
Result<bool> VertexAccessor::HasLabel(uint64_t label, View view) {
bool deleted = false;
bool has_label = false;
Delta *delta = nullptr;
{
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
deleted = vertex_->deleted;
has_label = std::find(vertex_->labels.begin(), vertex_->labels.end(),
label) != vertex_->labels.end();
delta = vertex_->delta;
}
ApplyDeltasForRead(transaction_, delta, view,
[&deleted, &has_label, label](const Delta &delta) {
switch (delta.action) {
case Delta::Action::REMOVE_LABEL: {
if (delta.label == label) {
CHECK(has_label) << "Invalid database state!";
has_label = false;
}
break;
}
case Delta::Action::ADD_LABEL: {
if (delta.label == label) {
CHECK(!has_label) << "Invalid database state!";
has_label = true;
}
break;
}
case Delta::Action::DELETE_OBJECT: {
LOG(FATAL) << "Invalid accessor!";
break;
}
case Delta::Action::RECREATE_OBJECT: {
deleted = false;
break;
}
case Delta::Action::SET_PROPERTY:
break;
}
});
if (deleted) return Result<bool>{Error::DELETED_OBJECT};
return Result<bool>{has_label};
}
Result<std::vector<uint64_t>> VertexAccessor::Labels(View view) {
bool deleted = false;
std::vector<uint64_t> labels;
Delta *delta = nullptr;
{
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
deleted = vertex_->deleted;
labels = vertex_->labels;
delta = vertex_->delta;
}
ApplyDeltasForRead(
transaction_, delta, view, [&deleted, &labels](const Delta &delta) {
switch (delta.action) {
case Delta::Action::REMOVE_LABEL: {
// Remove the label because we don't see the addition.
auto it = std::find(labels.begin(), labels.end(), delta.label);
CHECK(it != labels.end()) << "Invalid database state!";
std::swap(*it, *labels.rbegin());
labels.pop_back();
break;
}
case Delta::Action::ADD_LABEL: {
// Add the label because we don't see the removal.
auto it = std::find(labels.begin(), labels.end(), delta.label);
CHECK(it == labels.end()) << "Invalid database state!";
labels.push_back(delta.label);
break;
}
case Delta::Action::DELETE_OBJECT: {
LOG(FATAL) << "Invalid accessor!";
break;
}
case Delta::Action::RECREATE_OBJECT: {
deleted = false;
break;
}
case Delta::Action::SET_PROPERTY:
break;
}
});
if (deleted) return Result<std::vector<uint64_t>>{Error::DELETED_OBJECT};
return Result<std::vector<uint64_t>>{std::move(labels)};
}
Result<bool> VertexAccessor::SetProperty(uint64_t property,
const PropertyValue &value) {
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
if (!PrepareForWrite(transaction_, vertex_))
return Result<bool>{Error::SERIALIZATION_ERROR};
if (vertex_->deleted) return Result<bool>{Error::DELETED_OBJECT};
auto it = vertex_->properties.find(property);
bool existed = it != vertex_->properties.end();
if (it != vertex_->properties.end()) {
CreateAndLinkDelta(transaction_, vertex_, Delta::SetPropertyTag(), property,
it->second);
if (value.IsNull()) {
// remove the property
vertex_->properties.erase(it);
} else {
// set the value
it->second = value;
}
} else {
CreateAndLinkDelta(transaction_, vertex_, Delta::SetPropertyTag(), property,
PropertyValue());
if (!value.IsNull()) {
vertex_->properties.emplace(property, value);
}
}
return Result<bool>{existed};
}
Result<PropertyValue> VertexAccessor::GetProperty(uint64_t property,
View view) {
bool deleted = false;
PropertyValue value;
Delta *delta = nullptr;
{
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
deleted = vertex_->deleted;
auto it = vertex_->properties.find(property);
if (it != vertex_->properties.end()) {
value = it->second;
}
delta = vertex_->delta;
}
ApplyDeltasForRead(transaction_, delta, view,
[&deleted, &value, property](const Delta &delta) {
switch (delta.action) {
case Delta::Action::SET_PROPERTY: {
if (delta.property.key == property) {
value = delta.property.value;
}
break;
}
case Delta::Action::DELETE_OBJECT: {
LOG(FATAL) << "Invalid accessor!";
break;
}
case Delta::Action::RECREATE_OBJECT: {
deleted = false;
break;
}
case Delta::Action::ADD_LABEL:
case Delta::Action::REMOVE_LABEL:
break;
}
});
if (deleted) return Result<PropertyValue>{Error::DELETED_OBJECT};
return Result<PropertyValue>{std::move(value)};
}
Result<std::unordered_map<uint64_t, PropertyValue>> VertexAccessor::Properties(
View view) {
std::unordered_map<uint64_t, PropertyValue> properties;
bool deleted = false;
Delta *delta = nullptr;
{
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
deleted = vertex_->deleted;
properties = vertex_->properties;
delta = vertex_->delta;
}
ApplyDeltasForRead(
transaction_, delta, view, [&deleted, &properties](const Delta &delta) {
switch (delta.action) {
case Delta::Action::SET_PROPERTY: {
auto it = properties.find(delta.property.key);
if (it != properties.end()) {
if (delta.property.value.IsNull()) {
// remove the property
properties.erase(it);
} else {
// set the value
it->second = delta.property.value;
}
} else if (!delta.property.value.IsNull()) {
properties.emplace(delta.property.key, delta.property.value);
}
break;
}
case Delta::Action::DELETE_OBJECT: {
LOG(FATAL) << "Invalid accessor!";
break;
}
case Delta::Action::RECREATE_OBJECT: {
deleted = false;
break;
}
case Delta::Action::ADD_LABEL:
case Delta::Action::REMOVE_LABEL:
break;
}
});
if (deleted) {
return Result<std::unordered_map<uint64_t, PropertyValue>>{
Error::DELETED_OBJECT};
}
return Result<std::unordered_map<uint64_t, PropertyValue>>{
std::move(properties)};
}
} // namespace storage

View File

@ -1,11 +1,9 @@
#pragma once #pragma once
#include <memory>
#include <optional> #include <optional>
#include "storage/v2/vertex.hpp" #include "storage/v2/vertex.hpp"
#include "storage/v2/mvcc.hpp"
#include "storage/v2/result.hpp" #include "storage/v2/result.hpp"
#include "storage/v2/transaction.hpp" #include "storage/v2/transaction.hpp"
#include "storage/v2/view.hpp" #include "storage/v2/view.hpp"
@ -20,292 +18,23 @@ class VertexAccessor final {
public: public:
static std::optional<VertexAccessor> Create(Vertex *vertex, static std::optional<VertexAccessor> Create(Vertex *vertex,
Transaction *transaction, Transaction *transaction,
View view) { View view);
bool is_visible = true;
Delta *delta = nullptr;
{
std::lock_guard<utils::SpinLock> guard(vertex->lock);
is_visible = !vertex->deleted;
delta = vertex->delta;
}
ApplyDeltasForRead(transaction, delta, view,
[&is_visible](const Delta &delta) {
switch (delta.action) {
case Delta::Action::ADD_LABEL:
case Delta::Action::REMOVE_LABEL:
case Delta::Action::SET_PROPERTY:
break;
case Delta::Action::RECREATE_OBJECT: {
is_visible = true;
break;
}
case Delta::Action::DELETE_OBJECT: {
is_visible = false;
break;
}
}
});
if (!is_visible) return std::nullopt;
return VertexAccessor{vertex, transaction};
}
Result<bool> Delete() { Result<bool> Delete();
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
if (!PrepareForWrite(transaction_, vertex_)) Result<bool> AddLabel(uint64_t label);
return Result<bool>{Error::SERIALIZATION_ERROR};
if (vertex_->deleted) return Result<bool>{false}; Result<bool> RemoveLabel(uint64_t label);
CreateAndLinkDelta(transaction_, vertex_, Delta::RecreateObjectTag()); Result<bool> HasLabel(uint64_t label, View view);
vertex_->deleted = true; Result<std::vector<uint64_t>> Labels(View view);
return Result<bool>{true}; Result<bool> SetProperty(uint64_t property, const PropertyValue &value);
}
Result<bool> AddLabel(uint64_t label) { Result<PropertyValue> GetProperty(uint64_t property, View view);
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
if (!PrepareForWrite(transaction_, vertex_)) Result<std::unordered_map<uint64_t, PropertyValue>> Properties(View view);
return Result<bool>{Error::SERIALIZATION_ERROR};
if (vertex_->deleted) return Result<bool>{Error::DELETED_OBJECT};
if (std::find(vertex_->labels.begin(), vertex_->labels.end(), label) !=
vertex_->labels.end())
return Result<bool>{false};
CreateAndLinkDelta(transaction_, vertex_, Delta::RemoveLabelTag(), label);
vertex_->labels.push_back(label);
return Result<bool>{true};
}
Result<bool> RemoveLabel(uint64_t label) {
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
if (!PrepareForWrite(transaction_, vertex_))
return Result<bool>{Error::SERIALIZATION_ERROR};
if (vertex_->deleted) return Result<bool>{Error::DELETED_OBJECT};
auto it = std::find(vertex_->labels.begin(), vertex_->labels.end(), label);
if (it == vertex_->labels.end()) return Result<bool>{false};
CreateAndLinkDelta(transaction_, vertex_, Delta::AddLabelTag(), label);
std::swap(*it, *vertex_->labels.rbegin());
vertex_->labels.pop_back();
return Result<bool>{true};
}
Result<bool> HasLabel(uint64_t label, View view) {
bool deleted = false;
bool has_label = false;
Delta *delta = nullptr;
{
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
deleted = vertex_->deleted;
has_label = std::find(vertex_->labels.begin(), vertex_->labels.end(),
label) != vertex_->labels.end();
delta = vertex_->delta;
}
ApplyDeltasForRead(transaction_, delta, view,
[&deleted, &has_label, label](const Delta &delta) {
switch (delta.action) {
case Delta::Action::REMOVE_LABEL: {
if (delta.label == label) {
CHECK(has_label) << "Invalid database state!";
has_label = false;
}
break;
}
case Delta::Action::ADD_LABEL: {
if (delta.label == label) {
CHECK(!has_label) << "Invalid database state!";
has_label = true;
}
break;
}
case Delta::Action::DELETE_OBJECT: {
LOG(FATAL) << "Invalid accessor!";
break;
}
case Delta::Action::RECREATE_OBJECT: {
deleted = false;
break;
}
case Delta::Action::SET_PROPERTY:
break;
}
});
if (deleted) return Result<bool>{Error::DELETED_OBJECT};
return Result<bool>{has_label};
}
Result<std::vector<uint64_t>> Labels(View view) {
bool deleted = false;
std::vector<uint64_t> labels;
Delta *delta = nullptr;
{
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
deleted = vertex_->deleted;
labels = vertex_->labels;
delta = vertex_->delta;
}
ApplyDeltasForRead(
transaction_, delta, view, [&deleted, &labels](const Delta &delta) {
switch (delta.action) {
case Delta::Action::REMOVE_LABEL: {
// Remove the label because we don't see the addition.
auto it = std::find(labels.begin(), labels.end(), delta.label);
CHECK(it != labels.end()) << "Invalid database state!";
std::swap(*it, *labels.rbegin());
labels.pop_back();
break;
}
case Delta::Action::ADD_LABEL: {
// Add the label because we don't see the removal.
auto it = std::find(labels.begin(), labels.end(), delta.label);
CHECK(it == labels.end()) << "Invalid database state!";
labels.push_back(delta.label);
break;
}
case Delta::Action::DELETE_OBJECT: {
LOG(FATAL) << "Invalid accessor!";
break;
}
case Delta::Action::RECREATE_OBJECT: {
deleted = false;
break;
}
case Delta::Action::SET_PROPERTY:
break;
}
});
if (deleted) return Result<std::vector<uint64_t>>{Error::DELETED_OBJECT};
return Result<std::vector<uint64_t>>{std::move(labels)};
}
Result<bool> SetProperty(uint64_t property, const PropertyValue &value) {
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
if (!PrepareForWrite(transaction_, vertex_))
return Result<bool>{Error::SERIALIZATION_ERROR};
if (vertex_->deleted) return Result<bool>{Error::DELETED_OBJECT};
auto it = vertex_->properties.find(property);
bool existed = it != vertex_->properties.end();
if (it != vertex_->properties.end()) {
CreateAndLinkDelta(transaction_, vertex_, Delta::SetPropertyTag(),
property, it->second);
if (value.IsNull()) {
// remove the property
vertex_->properties.erase(it);
} else {
// set the value
it->second = value;
}
} else {
CreateAndLinkDelta(transaction_, vertex_, Delta::SetPropertyTag(),
property, PropertyValue());
if (!value.IsNull()) {
vertex_->properties.emplace(property, value);
}
}
return Result<bool>{existed};
}
Result<PropertyValue> GetProperty(uint64_t property, View view) {
bool deleted = false;
PropertyValue value;
Delta *delta = nullptr;
{
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
deleted = vertex_->deleted;
auto it = vertex_->properties.find(property);
if (it != vertex_->properties.end()) {
value = it->second;
}
delta = vertex_->delta;
}
ApplyDeltasForRead(transaction_, delta, view,
[&deleted, &value, property](const Delta &delta) {
switch (delta.action) {
case Delta::Action::SET_PROPERTY: {
if (delta.property.key == property) {
value = delta.property.value;
}
break;
}
case Delta::Action::DELETE_OBJECT: {
LOG(FATAL) << "Invalid accessor!";
break;
}
case Delta::Action::RECREATE_OBJECT: {
deleted = false;
break;
}
case Delta::Action::ADD_LABEL:
case Delta::Action::REMOVE_LABEL:
break;
}
});
if (deleted) return Result<PropertyValue>{Error::DELETED_OBJECT};
return Result<PropertyValue>{std::move(value)};
}
Result<std::unordered_map<uint64_t, PropertyValue>> Properties(View view) {
std::unordered_map<uint64_t, PropertyValue> properties;
bool deleted = false;
Delta *delta = nullptr;
{
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
deleted = vertex_->deleted;
properties = vertex_->properties;
delta = vertex_->delta;
}
ApplyDeltasForRead(
transaction_, delta, view, [&deleted, &properties](const Delta &delta) {
switch (delta.action) {
case Delta::Action::SET_PROPERTY: {
auto it = properties.find(delta.property.key);
if (it != properties.end()) {
if (delta.property.value.IsNull()) {
// remove the property
properties.erase(it);
} else {
// set the value
it->second = delta.property.value;
}
} else if (!delta.property.value.IsNull()) {
properties.emplace(delta.property.key, delta.property.value);
}
break;
}
case Delta::Action::DELETE_OBJECT: {
LOG(FATAL) << "Invalid accessor!";
break;
}
case Delta::Action::RECREATE_OBJECT: {
deleted = false;
break;
}
case Delta::Action::ADD_LABEL:
case Delta::Action::REMOVE_LABEL:
break;
}
});
if (deleted) {
return Result<std::unordered_map<uint64_t, PropertyValue>>{
Error::DELETED_OBJECT};
}
return Result<std::unordered_map<uint64_t, PropertyValue>>{
std::move(properties)};
}
Gid Gid() const { return vertex_->gid; } Gid Gid() const { return vertex_->gid; }

View File

@ -383,7 +383,7 @@ add_unit_test(property_value_v2.cpp)
target_link_libraries(${test_prefix}property_value_v2 mg-utils) target_link_libraries(${test_prefix}property_value_v2 mg-utils)
add_unit_test(storage_v2.cpp) add_unit_test(storage_v2.cpp)
target_link_libraries(${test_prefix}storage_v2 mg-utils) target_link_libraries(${test_prefix}storage_v2 mg-storage-v2)
# Test LCP # Test LCP