Fix build of storage library.

This commit is contained in:
Aidar Samerkhanov 2023-04-12 20:53:48 +00:00
parent c64b607478
commit 3c0ae50a15
15 changed files with 693 additions and 573 deletions

View File

@ -356,7 +356,7 @@ class DbAccessor final {
return EdgeAccessor(*maybe_edge); return EdgeAccessor(*maybe_edge);
} }
storage::Result<std::optional<EdgeAccessor>> RemoveEdge(EdgeAccessor *edge) { storage::Result<std::unique_ptr<EdgeAccessor>> RemoveEdge(EdgeAccessor *edge) {
auto res = accessor_->DeleteEdge(&edge->impl_); auto res = accessor_->DeleteEdge(&edge->impl_);
if (res.HasError()) { if (res.HasError()) {
return res.GetError(); return res.GetError();

View File

@ -776,16 +776,16 @@ void CreateSnapshot(Transaction *transaction, const std::filesystem::path &snaps
const auto &in_edges = maybe_in_edges.GetValue(); const auto &in_edges = maybe_in_edges.GetValue();
snapshot.WriteUint(in_edges.size()); snapshot.WriteUint(in_edges.size());
for (const auto &item : in_edges) { for (const auto &item : in_edges) {
snapshot.WriteUint(item.Gid().AsUint()); snapshot.WriteUint(item->Gid().AsUint());
snapshot.WriteUint(item.FromVertex()->Gid().AsUint()); snapshot.WriteUint(item->FromVertex()->Gid().AsUint());
write_mapping(item.EdgeType()); write_mapping(item->EdgeType());
} }
const auto &out_edges = maybe_out_edges.GetValue(); const auto &out_edges = maybe_out_edges.GetValue();
snapshot.WriteUint(out_edges.size()); snapshot.WriteUint(out_edges.size());
for (const auto &item : out_edges) { for (const auto &item : out_edges) {
snapshot.WriteUint(item.Gid().AsUint()); snapshot.WriteUint(item->Gid().AsUint());
snapshot.WriteUint(item.ToVertex()->Gid().AsUint()); snapshot.WriteUint(item->ToVertex()->Gid().AsUint());
write_mapping(item.EdgeType()); write_mapping(item->EdgeType());
} }
} }

View File

@ -68,7 +68,7 @@ class EdgeAccessor {
/// @throw std::bad_alloc /// @throw std::bad_alloc
virtual Result<std::map<PropertyId, PropertyValue>> Properties(View view) const = 0; virtual Result<std::map<PropertyId, PropertyValue>> Properties(View view) const = 0;
virtual class Gid Gid() const noexcept = 0; virtual storage::Gid Gid() const noexcept = 0;
virtual bool IsCycle() const = 0; virtual bool IsCycle() const = 0;

View File

@ -30,9 +30,9 @@ class VertexAccessor;
struct Indices; struct Indices;
struct Constraints; struct Constraints;
class InMemoryEdgeAccessor : public EdgeAccessor { class InMemoryEdgeAccessor final : public EdgeAccessor {
private: private:
friend class Storage; friend class InMemoryStorage;
public: public:
InMemoryEdgeAccessor(EdgeRef edge, EdgeTypeId edge_type, Vertex *from_vertex, Vertex *to_vertex, InMemoryEdgeAccessor(EdgeRef edge, EdgeTypeId edge_type, Vertex *from_vertex, Vertex *to_vertex,
@ -73,7 +73,7 @@ class InMemoryEdgeAccessor : public EdgeAccessor {
/// @throw std::bad_alloc /// @throw std::bad_alloc
Result<std::map<PropertyId, PropertyValue>> Properties(View view) const override; Result<std::map<PropertyId, PropertyValue>> Properties(View view) const override;
class Gid Gid() const noexcept override { storage::Gid Gid() const noexcept override {
if (config_.properties_on_edges) { if (config_.properties_on_edges) {
return edge_.ptr->gid; return edge_.ptr->gid;
} else { } else {
@ -83,10 +83,12 @@ class InMemoryEdgeAccessor : public EdgeAccessor {
bool IsCycle() const override { return from_vertex_ == to_vertex_; } bool IsCycle() const override { return from_vertex_ == to_vertex_; }
bool operator==(const InMemoryEdgeAccessor &other) const noexcept { bool operator==(const EdgeAccessor &other) const noexcept override {
return edge_ == other.edge_ && transaction_ == other.transaction_; const auto *otherEdge = dynamic_cast<const InMemoryEdgeAccessor *>(&other);
if (otherEdge == nullptr) return false;
return edge_ == otherEdge->edge_ && transaction_ == otherEdge->transaction_;
} }
bool operator!=(const InMemoryEdgeAccessor &other) const noexcept { return !(*this == other); } bool operator!=(const EdgeAccessor &other) const noexcept { return !(*this == other); }
private: private:
EdgeRef edge_; EdgeRef edge_;

View File

@ -27,6 +27,8 @@
#include "storage/v2/durability/wal.hpp" #include "storage/v2/durability/wal.hpp"
#include "storage/v2/edge_accessor.hpp" #include "storage/v2/edge_accessor.hpp"
#include "storage/v2/indices.hpp" #include "storage/v2/indices.hpp"
#include "storage/v2/inmemory/edge_accessor.hpp"
#include "storage/v2/inmemory/vertex_accessor.hpp"
#include "storage/v2/mvcc.hpp" #include "storage/v2/mvcc.hpp"
#include "storage/v2/replication/config.hpp" #include "storage/v2/replication/config.hpp"
#include "storage/v2/replication/enums.hpp" #include "storage/v2/replication/enums.hpp"
@ -55,25 +57,25 @@ using OOMExceptionEnabler = utils::MemoryTracker::OutOfMemoryExceptionEnabler;
namespace { namespace {
inline constexpr uint16_t kEpochHistoryRetention = 1000; inline constexpr uint16_t kEpochHistoryRetention = 1000;
std::string RegisterReplicaErrorToString(Storage::RegisterReplicaError error) { std::string RegisterReplicaErrorToString(InMemoryStorage::RegisterReplicaError error) {
switch (error) { switch (error) {
case Storage::RegisterReplicaError::NAME_EXISTS: case InMemoryStorage::RegisterReplicaError::NAME_EXISTS:
return "NAME_EXISTS"; return "NAME_EXISTS";
case Storage::RegisterReplicaError::END_POINT_EXISTS: case InMemoryStorage::RegisterReplicaError::END_POINT_EXISTS:
return "END_POINT_EXISTS"; return "END_POINT_EXISTS";
case Storage::RegisterReplicaError::CONNECTION_FAILED: case InMemoryStorage::RegisterReplicaError::CONNECTION_FAILED:
return "CONNECTION_FAILED"; return "CONNECTION_FAILED";
case Storage::RegisterReplicaError::COULD_NOT_BE_PERSISTED: case InMemoryStorage::RegisterReplicaError::COULD_NOT_BE_PERSISTED:
return "COULD_NOT_BE_PERSISTED"; return "COULD_NOT_BE_PERSISTED";
} }
} }
} // namespace } // namespace
auto AdvanceToVisibleVertex(utils::SkipList<Vertex>::Iterator it, utils::SkipList<Vertex>::Iterator end, auto AdvanceToVisibleVertex(utils::SkipList<Vertex>::Iterator it, utils::SkipList<Vertex>::Iterator end,
VertexAccessor *vertex, Transaction *tx, View view, Indices *indices, std::unique_ptr<VertexAccessor> &vertex, Transaction *tx, View view, Indices *indices,
Constraints *constraints, Config::Items config) { Constraints *constraints, Config::Items config) {
while (it != end) { while (it != end) {
vertex = VertexAccessor::Create(&*it, tx, indices, constraints, config, view).get(); vertex = VertexAccessor::Create(&*it, tx, indices, constraints, config, view);
if (!vertex) { if (!vertex) {
++it; ++it;
continue; continue;
@ -85,235 +87,19 @@ auto AdvanceToVisibleVertex(utils::SkipList<Vertex>::Iterator it, utils::SkipLis
AllVerticesIterable::Iterator::Iterator(AllVerticesIterable *self, utils::SkipList<Vertex>::Iterator it) AllVerticesIterable::Iterator::Iterator(AllVerticesIterable *self, utils::SkipList<Vertex>::Iterator it)
: self_(self), : self_(self),
it_(AdvanceToVisibleVertex(it, self->vertices_accessor_.end(), &self->vertex_, self->transaction_, self->view_, it_(AdvanceToVisibleVertex(it, self->vertices_accessor_.end(), self->vertex_, self->transaction_, self->view_,
self->indices_, self_->constraints_, self->config_)) {} self->indices_, self_->constraints_, self->config_)) {}
VertexAccessor AllVerticesIterable::Iterator::operator*() const { return *self_->vertex_; } VertexAccessor *AllVerticesIterable::Iterator::operator*() const { return self_->vertex_.get(); }
AllVerticesIterable::Iterator &AllVerticesIterable::Iterator::operator++() { AllVerticesIterable::Iterator &AllVerticesIterable::Iterator::operator++() {
++it_; ++it_;
it_ = AdvanceToVisibleVertex(it_, self_->vertices_accessor_.end(), &self_->vertex_, self_->transaction_, self_->view_, it_ = AdvanceToVisibleVertex(it_, self_->vertices_accessor_.end(), self_->vertex_, self_->transaction_, self_->view_,
self_->indices_, self_->constraints_, self_->config_); self_->indices_, self_->constraints_, self_->config_);
return *this; return *this;
} }
VerticesIterable::VerticesIterable(AllVerticesIterable vertices) : type_(Type::ALL) { InMemoryStorage::InMemoryStorage(Config config)
new (&all_vertices_) AllVerticesIterable(std::move(vertices));
}
VerticesIterable::VerticesIterable(LabelIndex::Iterable vertices) : type_(Type::BY_LABEL) {
new (&vertices_by_label_) LabelIndex::Iterable(std::move(vertices));
}
VerticesIterable::VerticesIterable(LabelPropertyIndex::Iterable vertices) : type_(Type::BY_LABEL_PROPERTY) {
new (&vertices_by_label_property_) LabelPropertyIndex::Iterable(std::move(vertices));
}
VerticesIterable::VerticesIterable(VerticesIterable &&other) noexcept : type_(other.type_) {
switch (other.type_) {
case Type::ALL:
new (&all_vertices_) AllVerticesIterable(std::move(other.all_vertices_));
break;
case Type::BY_LABEL:
new (&vertices_by_label_) LabelIndex::Iterable(std::move(other.vertices_by_label_));
break;
case Type::BY_LABEL_PROPERTY:
new (&vertices_by_label_property_) LabelPropertyIndex::Iterable(std::move(other.vertices_by_label_property_));
break;
}
}
VerticesIterable &VerticesIterable::operator=(VerticesIterable &&other) noexcept {
switch (type_) {
case Type::ALL:
all_vertices_.AllVerticesIterable::~AllVerticesIterable();
break;
case Type::BY_LABEL:
vertices_by_label_.LabelIndex::Iterable::~Iterable();
break;
case Type::BY_LABEL_PROPERTY:
vertices_by_label_property_.LabelPropertyIndex::Iterable::~Iterable();
break;
}
type_ = other.type_;
switch (other.type_) {
case Type::ALL:
new (&all_vertices_) AllVerticesIterable(std::move(other.all_vertices_));
break;
case Type::BY_LABEL:
new (&vertices_by_label_) LabelIndex::Iterable(std::move(other.vertices_by_label_));
break;
case Type::BY_LABEL_PROPERTY:
new (&vertices_by_label_property_) LabelPropertyIndex::Iterable(std::move(other.vertices_by_label_property_));
break;
}
return *this;
}
VerticesIterable::~VerticesIterable() {
switch (type_) {
case Type::ALL:
all_vertices_.AllVerticesIterable::~AllVerticesIterable();
break;
case Type::BY_LABEL:
vertices_by_label_.LabelIndex::Iterable::~Iterable();
break;
case Type::BY_LABEL_PROPERTY:
vertices_by_label_property_.LabelPropertyIndex::Iterable::~Iterable();
break;
}
}
VerticesIterable::Iterator VerticesIterable::begin() {
switch (type_) {
case Type::ALL:
return Iterator(all_vertices_.begin());
case Type::BY_LABEL:
return Iterator(vertices_by_label_.begin());
case Type::BY_LABEL_PROPERTY:
return Iterator(vertices_by_label_property_.begin());
}
}
VerticesIterable::Iterator VerticesIterable::end() {
switch (type_) {
case Type::ALL:
return Iterator(all_vertices_.end());
case Type::BY_LABEL:
return Iterator(vertices_by_label_.end());
case Type::BY_LABEL_PROPERTY:
return Iterator(vertices_by_label_property_.end());
}
}
VerticesIterable::Iterator::Iterator(AllVerticesIterable::Iterator it) : type_(Type::ALL) {
new (&all_it_) AllVerticesIterable::Iterator(std::move(it));
}
VerticesIterable::Iterator::Iterator(LabelIndex::Iterable::Iterator it) : type_(Type::BY_LABEL) {
new (&by_label_it_) LabelIndex::Iterable::Iterator(std::move(it));
}
VerticesIterable::Iterator::Iterator(LabelPropertyIndex::Iterable::Iterator it) : type_(Type::BY_LABEL_PROPERTY) {
new (&by_label_property_it_) LabelPropertyIndex::Iterable::Iterator(std::move(it));
}
VerticesIterable::Iterator::Iterator(const VerticesIterable::Iterator &other) : type_(other.type_) {
switch (other.type_) {
case Type::ALL:
new (&all_it_) AllVerticesIterable::Iterator(other.all_it_);
break;
case Type::BY_LABEL:
new (&by_label_it_) LabelIndex::Iterable::Iterator(other.by_label_it_);
break;
case Type::BY_LABEL_PROPERTY:
new (&by_label_property_it_) LabelPropertyIndex::Iterable::Iterator(other.by_label_property_it_);
break;
}
}
VerticesIterable::Iterator &VerticesIterable::Iterator::operator=(const VerticesIterable::Iterator &other) {
Destroy();
type_ = other.type_;
switch (other.type_) {
case Type::ALL:
new (&all_it_) AllVerticesIterable::Iterator(other.all_it_);
break;
case Type::BY_LABEL:
new (&by_label_it_) LabelIndex::Iterable::Iterator(other.by_label_it_);
break;
case Type::BY_LABEL_PROPERTY:
new (&by_label_property_it_) LabelPropertyIndex::Iterable::Iterator(other.by_label_property_it_);
break;
}
return *this;
}
VerticesIterable::Iterator::Iterator(VerticesIterable::Iterator &&other) noexcept : type_(other.type_) {
switch (other.type_) {
case Type::ALL:
new (&all_it_) AllVerticesIterable::Iterator(std::move(other.all_it_));
break;
case Type::BY_LABEL:
new (&by_label_it_) LabelIndex::Iterable::Iterator(std::move(other.by_label_it_));
break;
case Type::BY_LABEL_PROPERTY:
new (&by_label_property_it_) LabelPropertyIndex::Iterable::Iterator(std::move(other.by_label_property_it_));
break;
}
}
VerticesIterable::Iterator &VerticesIterable::Iterator::operator=(VerticesIterable::Iterator &&other) noexcept {
Destroy();
type_ = other.type_;
switch (other.type_) {
case Type::ALL:
new (&all_it_) AllVerticesIterable::Iterator(std::move(other.all_it_));
break;
case Type::BY_LABEL:
new (&by_label_it_) LabelIndex::Iterable::Iterator(std::move(other.by_label_it_));
break;
case Type::BY_LABEL_PROPERTY:
new (&by_label_property_it_) LabelPropertyIndex::Iterable::Iterator(std::move(other.by_label_property_it_));
break;
}
return *this;
}
VerticesIterable::Iterator::~Iterator() { Destroy(); }
void VerticesIterable::Iterator::Destroy() noexcept {
switch (type_) {
case Type::ALL:
all_it_.AllVerticesIterable::Iterator::~Iterator();
break;
case Type::BY_LABEL:
by_label_it_.LabelIndex::Iterable::Iterator::~Iterator();
break;
case Type::BY_LABEL_PROPERTY:
by_label_property_it_.LabelPropertyIndex::Iterable::Iterator::~Iterator();
break;
}
}
VertexAccessor VerticesIterable::Iterator::operator*() const {
switch (type_) {
case Type::ALL:
return *all_it_;
case Type::BY_LABEL:
return *by_label_it_;
case Type::BY_LABEL_PROPERTY:
return *by_label_property_it_;
}
}
VerticesIterable::Iterator &VerticesIterable::Iterator::operator++() {
switch (type_) {
case Type::ALL:
++all_it_;
break;
case Type::BY_LABEL:
++by_label_it_;
break;
case Type::BY_LABEL_PROPERTY:
++by_label_property_it_;
break;
}
return *this;
}
bool VerticesIterable::Iterator::operator==(const Iterator &other) const {
switch (type_) {
case Type::ALL:
return all_it_ == other.all_it_;
case Type::BY_LABEL:
return by_label_it_ == other.by_label_it_;
case Type::BY_LABEL_PROPERTY:
return by_label_property_it_ == other.by_label_property_it_;
}
}
Storage::Storage(Config config)
: indices_(&constraints_, config.items), : indices_(&constraints_, config.items),
isolation_level_(config.transaction.isolation_level), isolation_level_(config.transaction.isolation_level),
config_(config), config_(config),
@ -429,7 +215,7 @@ Storage::Storage(Config config)
} }
} }
Storage::~Storage() { InMemoryStorage::~InMemoryStorage() {
if (config_.gc.type == Config::Gc::Type::PERIODIC) { if (config_.gc.type == Config::Gc::Type::PERIODIC) {
gc_runner_.Stop(); gc_runner_.Stop();
} }
@ -456,8 +242,9 @@ Storage::~Storage() {
} }
} }
Storage::Accessor::Accessor(Storage *storage, IsolationLevel isolation_level) InMemoryStorage::InMemoryAccessor::InMemoryAccessor(InMemoryStorage *storage, IsolationLevel isolation_level)
: storage_(storage), : Accessor(),
storage_(storage),
// The lock must be acquired before creating the transaction object to // The lock must be acquired before creating the transaction object to
// prevent freshly created transactions from dangling in an active state // prevent freshly created transactions from dangling in an active state
// during exclusive operations. // during exclusive operations.
@ -466,8 +253,9 @@ Storage::Accessor::Accessor(Storage *storage, IsolationLevel isolation_level)
is_transaction_active_(true), is_transaction_active_(true),
config_(storage->config_.items) {} config_(storage->config_.items) {}
Storage::Accessor::Accessor(Accessor &&other) noexcept InMemoryStorage::InMemoryAccessor::InMemoryAccessor(InMemoryAccessor &&other) noexcept
: storage_(other.storage_), : Accessor(),
storage_(other.storage_),
storage_guard_(std::move(other.storage_guard_)), storage_guard_(std::move(other.storage_guard_)),
transaction_(std::move(other.transaction_)), transaction_(std::move(other.transaction_)),
commit_timestamp_(other.commit_timestamp_), commit_timestamp_(other.commit_timestamp_),
@ -478,7 +266,7 @@ Storage::Accessor::Accessor(Accessor &&other) noexcept
other.commit_timestamp_.reset(); other.commit_timestamp_.reset();
} }
Storage::Accessor::~Accessor() { InMemoryStorage::InMemoryAccessor::~InMemoryAccessor() {
if (is_transaction_active_) { if (is_transaction_active_) {
Abort(); Abort();
} }
@ -486,7 +274,7 @@ Storage::Accessor::~Accessor() {
FinalizeTransaction(); FinalizeTransaction();
} }
VertexAccessor Storage::Accessor::CreateVertex() { std::unique_ptr<VertexAccessor> InMemoryStorage::InMemoryAccessor::CreateVertex() {
OOMExceptionEnabler oom_exception; OOMExceptionEnabler oom_exception;
auto gid = storage_->vertex_id_.fetch_add(1, std::memory_order_acq_rel); auto gid = storage_->vertex_id_.fetch_add(1, std::memory_order_acq_rel);
auto acc = storage_->vertices_.access(); auto acc = storage_->vertices_.access();
@ -495,10 +283,11 @@ VertexAccessor Storage::Accessor::CreateVertex() {
MG_ASSERT(inserted, "The vertex must be inserted here!"); MG_ASSERT(inserted, "The vertex must be inserted here!");
MG_ASSERT(it != acc.end(), "Invalid Vertex accessor!"); MG_ASSERT(it != acc.end(), "Invalid Vertex accessor!");
delta->prev.Set(&*it); delta->prev.Set(&*it);
return VertexAccessor(&*it, &transaction_, &storage_->indices_, &storage_->constraints_, config_); return std::make_unique<InMemoryVertexAccessor>(&*it, &transaction_, &storage_->indices_, &storage_->constraints_,
config_);
} }
VertexAccessor Storage::Accessor::CreateVertex(storage::Gid gid) { std::unique_ptr<VertexAccessor> InMemoryStorage::InMemoryAccessor::CreateVertex(storage::Gid gid) {
OOMExceptionEnabler oom_exception; OOMExceptionEnabler oom_exception;
// NOTE: When we update the next `vertex_id_` here we perform a RMW // NOTE: When we update the next `vertex_id_` here we perform a RMW
// (read-modify-write) operation that ISN'T atomic! But, that isn't an issue // (read-modify-write) operation that ISN'T atomic! But, that isn't an issue
@ -514,28 +303,31 @@ VertexAccessor Storage::Accessor::CreateVertex(storage::Gid gid) {
MG_ASSERT(inserted, "The vertex must be inserted here!"); MG_ASSERT(inserted, "The vertex must be inserted here!");
MG_ASSERT(it != acc.end(), "Invalid Vertex accessor!"); MG_ASSERT(it != acc.end(), "Invalid Vertex accessor!");
delta->prev.Set(&*it); delta->prev.Set(&*it);
return VertexAccessor(&*it, &transaction_, &storage_->indices_, &storage_->constraints_, config_); return std::make_unique<InMemoryVertexAccessor>(&*it, &transaction_, &storage_->indices_, &storage_->constraints_,
config_);
} }
std::optional<VertexAccessor> Storage::Accessor::FindVertex(Gid gid, View view) { std::unique_ptr<VertexAccessor> InMemoryStorage::InMemoryAccessor::FindVertex(storage::Gid gid, View view) {
auto acc = storage_->vertices_.access(); auto acc = storage_->vertices_.access();
auto it = acc.find(gid); auto it = acc.find(gid);
if (it == acc.end()) return std::nullopt; if (it == acc.end()) return std::unique_ptr<VertexAccessor>();
return VertexAccessor::Create(&*it, &transaction_, &storage_->indices_, &storage_->constraints_, config_, view); return VertexAccessor::Create(&*it, &transaction_, &storage_->indices_, &storage_->constraints_, config_, view);
} }
Result<std::optional<VertexAccessor>> Storage::Accessor::DeleteVertex(VertexAccessor *vertex) { Result<std::unique_ptr<VertexAccessor>> InMemoryStorage::InMemoryAccessor::DeleteVertex(VertexAccessor *vertex) {
MG_ASSERT(vertex->transaction_ == &transaction_, auto *inMemoryVA = dynamic_cast<InMemoryVertexAccessor *>(vertex);
MG_ASSERT(inMemoryVA, "VertexAccessor must be from the same storage as the storage accessor when deleting a vertex!");
MG_ASSERT(inMemoryVA->transaction_ == &transaction_,
"VertexAccessor must be from the same transaction as the storage " "VertexAccessor must be from the same transaction as the storage "
"accessor when deleting a vertex!"); "accessor when deleting a vertex!");
auto *vertex_ptr = vertex->vertex_; auto *vertex_ptr = inMemoryVA->vertex_;
std::lock_guard<utils::SpinLock> guard(vertex_ptr->lock); std::lock_guard<utils::SpinLock> guard(vertex_ptr->lock);
if (!PrepareForWrite(&transaction_, vertex_ptr)) return Error::SERIALIZATION_ERROR; if (!PrepareForWrite(&transaction_, vertex_ptr)) return Error::SERIALIZATION_ERROR;
if (vertex_ptr->deleted) { if (vertex_ptr->deleted) {
return std::optional<VertexAccessor>{}; return Result<std::unique_ptr<VertexAccessor>>{std::unique_ptr<InMemoryVertexAccessor>()};
} }
if (!vertex_ptr->in_edges.empty() || !vertex_ptr->out_edges.empty()) return Error::VERTEX_HAS_EDGES; if (!vertex_ptr->in_edges.empty() || !vertex_ptr->out_edges.empty()) return Error::VERTEX_HAS_EDGES;
@ -543,18 +335,19 @@ Result<std::optional<VertexAccessor>> Storage::Accessor::DeleteVertex(VertexAcce
CreateAndLinkDelta(&transaction_, vertex_ptr, Delta::RecreateObjectTag()); CreateAndLinkDelta(&transaction_, vertex_ptr, Delta::RecreateObjectTag());
vertex_ptr->deleted = true; vertex_ptr->deleted = true;
return std::make_optional<VertexAccessor>(vertex_ptr, &transaction_, &storage_->indices_, &storage_->constraints_, return Result<std::unique_ptr<VertexAccessor>>{std::make_unique<InMemoryVertexAccessor>(
config_, true); vertex_ptr, &transaction_, &storage_->indices_, &storage_->constraints_, config_, true)};
} }
Result<std::optional<std::pair<VertexAccessor, std::vector<EdgeAccessor>>>> Storage::Accessor::DetachDeleteVertex( Result<std::optional<std::pair<std::unique_ptr<VertexAccessor>, std::vector<std::unique_ptr<EdgeAccessor>>>>>
VertexAccessor *vertex) { InMemoryStorage::InMemoryAccessor::DetachDeleteVertex(VertexAccessor *vertex) {
using ReturnType = std::pair<VertexAccessor, std::vector<EdgeAccessor>>; using ReturnType = std::pair<std::unique_ptr<VertexAccessor>, std::vector<std::unique_ptr<EdgeAccessor>>>;
auto *inMemoryVA = dynamic_cast<InMemoryVertexAccessor *>(vertex);
MG_ASSERT(vertex->transaction_ == &transaction_, MG_ASSERT(inMemoryVA, "VertexAccessor must be from the same storage as the storage accessor when deleting a vertex!");
MG_ASSERT(inMemoryVA->transaction_ == &transaction_,
"VertexAccessor must be from the same transaction as the storage " "VertexAccessor must be from the same transaction as the storage "
"accessor when deleting a vertex!"); "accessor when deleting a vertex!");
auto *vertex_ptr = vertex->vertex_; auto *vertex_ptr = inMemoryVA->vertex_;
std::vector<std::tuple<EdgeTypeId, Vertex *, EdgeRef>> in_edges; std::vector<std::tuple<EdgeTypeId, Vertex *, EdgeRef>> in_edges;
std::vector<std::tuple<EdgeTypeId, Vertex *, EdgeRef>> out_edges; std::vector<std::tuple<EdgeTypeId, Vertex *, EdgeRef>> out_edges;
@ -570,11 +363,11 @@ Result<std::optional<std::pair<VertexAccessor, std::vector<EdgeAccessor>>>> Stor
out_edges = vertex_ptr->out_edges; out_edges = vertex_ptr->out_edges;
} }
std::vector<EdgeAccessor> deleted_edges; std::vector<std::unique_ptr<EdgeAccessor>> deleted_edges;
for (const auto &item : in_edges) { for (const auto &item : in_edges) {
auto [edge_type, from_vertex, edge] = item; auto [edge_type, from_vertex, edge] = item;
EdgeAccessor e(edge, edge_type, from_vertex, vertex_ptr, &transaction_, &storage_->indices_, InMemoryEdgeAccessor e(edge, edge_type, from_vertex, vertex_ptr, &transaction_, &storage_->indices_,
&storage_->constraints_, config_); &storage_->constraints_, config_);
auto ret = DeleteEdge(&e); auto ret = DeleteEdge(&e);
if (ret.HasError()) { if (ret.HasError()) {
MG_ASSERT(ret.GetError() == Error::SERIALIZATION_ERROR, "Invalid database state!"); MG_ASSERT(ret.GetError() == Error::SERIALIZATION_ERROR, "Invalid database state!");
@ -582,13 +375,13 @@ Result<std::optional<std::pair<VertexAccessor, std::vector<EdgeAccessor>>>> Stor
} }
if (ret.GetValue()) { if (ret.GetValue()) {
deleted_edges.push_back(*ret.GetValue()); deleted_edges.emplace_back(std::move(ret.GetValue()));
} }
} }
for (const auto &item : out_edges) { for (const auto &item : out_edges) {
auto [edge_type, to_vertex, edge] = item; auto [edge_type, to_vertex, edge] = item;
EdgeAccessor e(edge, edge_type, vertex_ptr, to_vertex, &transaction_, &storage_->indices_, &storage_->constraints_, InMemoryEdgeAccessor e(edge, edge_type, vertex_ptr, to_vertex, &transaction_, &storage_->indices_,
config_); &storage_->constraints_, config_);
auto ret = DeleteEdge(&e); auto ret = DeleteEdge(&e);
if (ret.HasError()) { if (ret.HasError()) {
MG_ASSERT(ret.GetError() == Error::SERIALIZATION_ERROR, "Invalid database state!"); MG_ASSERT(ret.GetError() == Error::SERIALIZATION_ERROR, "Invalid database state!");
@ -596,7 +389,7 @@ Result<std::optional<std::pair<VertexAccessor, std::vector<EdgeAccessor>>>> Stor
} }
if (ret.GetValue()) { if (ret.GetValue()) {
deleted_edges.push_back(*ret.GetValue()); deleted_edges.emplace_back(std::move(ret.GetValue()));
} }
} }
@ -614,21 +407,30 @@ Result<std::optional<std::pair<VertexAccessor, std::vector<EdgeAccessor>>>> Stor
vertex_ptr->deleted = true; vertex_ptr->deleted = true;
return std::make_optional<ReturnType>( return std::make_optional<ReturnType>(
VertexAccessor{vertex_ptr, &transaction_, &storage_->indices_, &storage_->constraints_, config_, true}, std::make_unique<InMemoryVertexAccessor>(vertex_ptr, &transaction_, &storage_->indices_, &storage_->constraints_,
config_, true),
std::move(deleted_edges)); std::move(deleted_edges));
} }
Result<EdgeAccessor> Storage::Accessor::CreateEdge(VertexAccessor *from, VertexAccessor *to, EdgeTypeId edge_type) { Result<std::unique_ptr<EdgeAccessor>> InMemoryStorage::InMemoryAccessor::CreateEdge(VertexAccessor *from,
VertexAccessor *to,
EdgeTypeId edge_type) {
auto *inMemoryVAFrom = dynamic_cast<InMemoryVertexAccessor *>(from);
auto *inMemoryVATo = dynamic_cast<InMemoryVertexAccessor *>(to);
MG_ASSERT(inMemoryVAFrom,
"Source VertexAccessor must be from the same storage as the storage accessor when creating an edge!");
MG_ASSERT(inMemoryVATo,
"Target VertexAccessor must be from the same storage as the storage accessor when creating an edge!");
OOMExceptionEnabler oom_exception; OOMExceptionEnabler oom_exception;
MG_ASSERT(from->transaction_ == to->transaction_, MG_ASSERT(inMemoryVAFrom->transaction_ == inMemoryVATo->transaction_,
"VertexAccessors must be from the same transaction when creating " "VertexAccessors must be from the same transaction when creating "
"an edge!"); "an edge!");
MG_ASSERT(from->transaction_ == &transaction_, MG_ASSERT(inMemoryVAFrom->transaction_ == &transaction_,
"VertexAccessors must be from the same transaction in when " "VertexAccessors must be from the same transaction in when "
"creating an edge!"); "creating an edge!");
auto from_vertex = from->vertex_; auto from_vertex = inMemoryVAFrom->vertex_;
auto to_vertex = to->vertex_; auto to_vertex = inMemoryVATo->vertex_;
// Obtain the locks by `gid` order to avoid lock cycles. // Obtain the locks by `gid` order to avoid lock cycles.
std::unique_lock<utils::SpinLock> guard_from(from_vertex->lock, std::defer_lock); std::unique_lock<utils::SpinLock> guard_from(from_vertex->lock, std::defer_lock);
@ -673,22 +475,31 @@ Result<EdgeAccessor> Storage::Accessor::CreateEdge(VertexAccessor *from, VertexA
// Increment edge count. // Increment edge count.
storage_->edge_count_.fetch_add(1, std::memory_order_acq_rel); storage_->edge_count_.fetch_add(1, std::memory_order_acq_rel);
return EdgeAccessor(edge, edge_type, from_vertex, to_vertex, &transaction_, &storage_->indices_, return Result<std::unique_ptr<EdgeAccessor>>{std::make_unique<InMemoryEdgeAccessor>(
&storage_->constraints_, config_); edge, edge_type, from_vertex, to_vertex, &transaction_, &storage_->indices_, &storage_->constraints_, config_)};
} }
Result<EdgeAccessor> Storage::Accessor::CreateEdge(VertexAccessor *from, VertexAccessor *to, EdgeTypeId edge_type, Result<std::unique_ptr<EdgeAccessor>> InMemoryStorage::InMemoryAccessor::CreateEdge(VertexAccessor *from,
storage::Gid gid) { VertexAccessor *to,
EdgeTypeId edge_type,
storage::Gid gid) {
auto *inMemoryVAFrom = dynamic_cast<InMemoryVertexAccessor *>(from);
auto *inMemoryVATo = dynamic_cast<InMemoryVertexAccessor *>(to);
MG_ASSERT(inMemoryVAFrom,
"Source VertexAccessor must be from the same storage as the storage accessor when creating an edge!");
MG_ASSERT(inMemoryVATo,
"Target VertexAccessor must be from the same storage as the storage accessor when creating an edge!");
OOMExceptionEnabler oom_exception; OOMExceptionEnabler oom_exception;
MG_ASSERT(from->transaction_ == to->transaction_, MG_ASSERT(inMemoryVAFrom->transaction_ == inMemoryVATo->transaction_,
"VertexAccessors must be from the same transaction when creating " "VertexAccessors must be from the same transaction when creating "
"an edge!"); "an edge!");
MG_ASSERT(from->transaction_ == &transaction_, MG_ASSERT(inMemoryVAFrom->transaction_ == &transaction_,
"VertexAccessors must be from the same transaction in when " "VertexAccessors must be from the same transaction in when "
"creating an edge!"); "creating an edge!");
auto from_vertex = from->vertex_; auto from_vertex = inMemoryVAFrom->vertex_;
auto to_vertex = to->vertex_; auto to_vertex = inMemoryVATo->vertex_;
// Obtain the locks by `gid` order to avoid lock cycles. // Obtain the locks by `gid` order to avoid lock cycles.
std::unique_lock<utils::SpinLock> guard_from(from_vertex->lock, std::defer_lock); std::unique_lock<utils::SpinLock> guard_from(from_vertex->lock, std::defer_lock);
@ -741,29 +552,31 @@ Result<EdgeAccessor> Storage::Accessor::CreateEdge(VertexAccessor *from, VertexA
// Increment edge count. // Increment edge count.
storage_->edge_count_.fetch_add(1, std::memory_order_acq_rel); storage_->edge_count_.fetch_add(1, std::memory_order_acq_rel);
return EdgeAccessor(edge, edge_type, from_vertex, to_vertex, &transaction_, &storage_->indices_, return Result<std::unique_ptr<EdgeAccessor>>{std::make_unique<InMemoryEdgeAccessor>(
&storage_->constraints_, config_); edge, edge_type, from_vertex, to_vertex, &transaction_, &storage_->indices_, &storage_->constraints_, config_)};
} }
Result<std::optional<EdgeAccessor>> Storage::Accessor::DeleteEdge(EdgeAccessor *edge) { Result<std::unique_ptr<EdgeAccessor>> InMemoryStorage::InMemoryAccessor::DeleteEdge(EdgeAccessor *edge) {
MG_ASSERT(edge->transaction_ == &transaction_, auto *inMemoryEA = dynamic_cast<InMemoryEdgeAccessor *>(edge);
MG_ASSERT(inMemoryEA, "EdgeAccessor must be from the same storage as the storage accessor when deleting an edge!");
MG_ASSERT(inMemoryEA->transaction_ == &transaction_,
"EdgeAccessor must be from the same transaction as the storage " "EdgeAccessor must be from the same transaction as the storage "
"accessor when deleting an edge!"); "accessor when deleting an edge!");
auto edge_ref = edge->edge_; auto edge_ref = inMemoryEA->edge_;
auto edge_type = edge->edge_type_; auto edge_type = inMemoryEA->edge_type_;
std::unique_lock<utils::SpinLock> guard; std::unique_lock<utils::SpinLock> guard;
if (config_.properties_on_edges) { if (config_.properties_on_edges) {
auto edge_ptr = edge_ref.ptr; auto *edge_ptr = edge_ref.ptr;
guard = std::unique_lock<utils::SpinLock>(edge_ptr->lock); guard = std::unique_lock<utils::SpinLock>(edge_ptr->lock);
if (!PrepareForWrite(&transaction_, edge_ptr)) return Error::SERIALIZATION_ERROR; if (!PrepareForWrite(&transaction_, edge_ptr)) return Error::SERIALIZATION_ERROR;
if (edge_ptr->deleted) return std::optional<EdgeAccessor>{}; if (edge_ptr->deleted) return Result<std::unique_ptr<EdgeAccessor>>{std::unique_ptr<InMemoryEdgeAccessor>()};
} }
auto *from_vertex = edge->from_vertex_; auto *from_vertex = inMemoryEA->from_vertex_;
auto *to_vertex = edge->to_vertex_; auto *to_vertex = inMemoryEA->to_vertex_;
// Obtain the locks by `gid` order to avoid lock cycles. // Obtain the locks by `gid` order to avoid lock cycles.
std::unique_lock<utils::SpinLock> guard_from(from_vertex->lock, std::defer_lock); std::unique_lock<utils::SpinLock> guard_from(from_vertex->lock, std::defer_lock);
@ -809,7 +622,7 @@ Result<std::optional<EdgeAccessor>> Storage::Accessor::DeleteEdge(EdgeAccessor *
MG_ASSERT((op1 && op2) || (!op1 && !op2), "Invalid database state!"); MG_ASSERT((op1 && op2) || (!op1 && !op2), "Invalid database state!");
if (!op1 && !op2) { if (!op1 && !op2) {
// The edge is already deleted. // The edge is already deleted.
return std::optional<EdgeAccessor>{}; return Result<std::unique_ptr<EdgeAccessor>>{std::unique_ptr<InMemoryEdgeAccessor>()};
} }
} }
@ -825,29 +638,38 @@ Result<std::optional<EdgeAccessor>> Storage::Accessor::DeleteEdge(EdgeAccessor *
// Decrement edge count. // Decrement edge count.
storage_->edge_count_.fetch_add(-1, std::memory_order_acq_rel); storage_->edge_count_.fetch_add(-1, std::memory_order_acq_rel);
return std::make_optional<EdgeAccessor>(edge_ref, edge_type, from_vertex, to_vertex, &transaction_, return Result<std::unique_ptr<EdgeAccessor>>{
&storage_->indices_, &storage_->constraints_, config_, true); std::make_unique<InMemoryEdgeAccessor>(edge_ref, edge_type, from_vertex, to_vertex, &transaction_,
&storage_->indices_, &storage_->constraints_, config_, true)};
} }
const std::string &Storage::Accessor::LabelToName(LabelId label) const { return storage_->LabelToName(label); } const std::string &InMemoryStorage::InMemoryAccessor::LabelToName(LabelId label) const {
return storage_->LabelToName(label);
}
const std::string &Storage::Accessor::PropertyToName(PropertyId property) const { const std::string &InMemoryStorage::InMemoryAccessor::PropertyToName(PropertyId property) const {
return storage_->PropertyToName(property); return storage_->PropertyToName(property);
} }
const std::string &Storage::Accessor::EdgeTypeToName(EdgeTypeId edge_type) const { const std::string &InMemoryStorage::InMemoryAccessor::EdgeTypeToName(EdgeTypeId edge_type) const {
return storage_->EdgeTypeToName(edge_type); return storage_->EdgeTypeToName(edge_type);
} }
LabelId Storage::Accessor::NameToLabel(const std::string_view name) { return storage_->NameToLabel(name); } LabelId InMemoryStorage::InMemoryAccessor::NameToLabel(const std::string_view name) {
return storage_->NameToLabel(name);
}
PropertyId Storage::Accessor::NameToProperty(const std::string_view name) { return storage_->NameToProperty(name); } PropertyId InMemoryStorage::InMemoryAccessor::NameToProperty(const std::string_view name) {
return storage_->NameToProperty(name);
}
EdgeTypeId Storage::Accessor::NameToEdgeType(const std::string_view name) { return storage_->NameToEdgeType(name); } EdgeTypeId InMemoryStorage::InMemoryAccessor::NameToEdgeType(const std::string_view name) {
return storage_->NameToEdgeType(name);
}
void Storage::Accessor::AdvanceCommand() { ++transaction_.command_id; } void InMemoryStorage::InMemoryAccessor::AdvanceCommand() { ++transaction_.command_id; }
utils::BasicResult<StorageDataManipulationError, void> Storage::Accessor::Commit( utils::BasicResult<StorageDataManipulationError, void> InMemoryStorage::InMemoryAccessor::Commit(
const std::optional<uint64_t> desired_commit_timestamp) { const std::optional<uint64_t> desired_commit_timestamp) {
MG_ASSERT(is_transaction_active_, "The transaction is already terminated!"); MG_ASSERT(is_transaction_active_, "The transaction is already terminated!");
MG_ASSERT(!transaction_.must_abort, "The transaction can't be committed!"); MG_ASSERT(!transaction_.must_abort, "The transaction can't be committed!");
@ -969,7 +791,7 @@ utils::BasicResult<StorageDataManipulationError, void> Storage::Accessor::Commit
return {}; return {};
} }
void Storage::Accessor::Abort() { void InMemoryStorage::InMemoryAccessor::Abort() {
MG_ASSERT(is_transaction_active_, "The transaction is already terminated!"); MG_ASSERT(is_transaction_active_, "The transaction is already terminated!");
// We collect vertices and edges we've created here and then splice them into // We collect vertices and edges we've created here and then splice them into
@ -1135,7 +957,7 @@ void Storage::Accessor::Abort() {
is_transaction_active_ = false; is_transaction_active_ = false;
} }
void Storage::Accessor::FinalizeTransaction() { void InMemoryStorage::InMemoryAccessor::FinalizeTransaction() {
if (commit_timestamp_) { if (commit_timestamp_) {
storage_->commit_log_->MarkFinished(*commit_timestamp_); storage_->commit_log_->MarkFinished(*commit_timestamp_);
storage_->committed_transactions_.WithLock( storage_->committed_transactions_.WithLock(
@ -1144,34 +966,38 @@ void Storage::Accessor::FinalizeTransaction() {
} }
} }
std::optional<uint64_t> Storage::Accessor::GetTransactionId() const { std::optional<uint64_t> InMemoryStorage::InMemoryAccessor::GetTransactionId() const {
if (is_transaction_active_) { if (is_transaction_active_) {
return transaction_.transaction_id.load(std::memory_order_acquire); return transaction_.transaction_id.load(std::memory_order_acquire);
} }
return {}; return {};
} }
const std::string &Storage::LabelToName(LabelId label) const { return name_id_mapper_.IdToName(label.AsUint()); } const std::string &InMemoryStorage::LabelToName(LabelId label) const {
return name_id_mapper_.IdToName(label.AsUint());
}
const std::string &Storage::PropertyToName(PropertyId property) const { const std::string &InMemoryStorage::PropertyToName(PropertyId property) const {
return name_id_mapper_.IdToName(property.AsUint()); return name_id_mapper_.IdToName(property.AsUint());
} }
const std::string &Storage::EdgeTypeToName(EdgeTypeId edge_type) const { const std::string &InMemoryStorage::EdgeTypeToName(EdgeTypeId edge_type) const {
return name_id_mapper_.IdToName(edge_type.AsUint()); return name_id_mapper_.IdToName(edge_type.AsUint());
} }
LabelId Storage::NameToLabel(const std::string_view name) { return LabelId::FromUint(name_id_mapper_.NameToId(name)); } LabelId InMemoryStorage::NameToLabel(const std::string_view name) {
return LabelId::FromUint(name_id_mapper_.NameToId(name));
}
PropertyId Storage::NameToProperty(const std::string_view name) { PropertyId InMemoryStorage::NameToProperty(const std::string_view name) {
return PropertyId::FromUint(name_id_mapper_.NameToId(name)); return PropertyId::FromUint(name_id_mapper_.NameToId(name));
} }
EdgeTypeId Storage::NameToEdgeType(const std::string_view name) { EdgeTypeId InMemoryStorage::NameToEdgeType(const std::string_view name) {
return EdgeTypeId::FromUint(name_id_mapper_.NameToId(name)); return EdgeTypeId::FromUint(name_id_mapper_.NameToId(name));
} }
utils::BasicResult<StorageIndexDefinitionError, void> Storage::CreateIndex( utils::BasicResult<StorageIndexDefinitionError, void> InMemoryStorage::CreateIndex(
LabelId label, const std::optional<uint64_t> desired_commit_timestamp) { LabelId label, const std::optional<uint64_t> desired_commit_timestamp) {
std::unique_lock<utils::RWLock> storage_guard(main_lock_); std::unique_lock<utils::RWLock> storage_guard(main_lock_);
if (!indices_.label_index.CreateIndex(label, vertices_.access())) { if (!indices_.label_index.CreateIndex(label, vertices_.access())) {
@ -1190,7 +1016,7 @@ utils::BasicResult<StorageIndexDefinitionError, void> Storage::CreateIndex(
return StorageIndexDefinitionError{ReplicationError{}}; return StorageIndexDefinitionError{ReplicationError{}};
} }
utils::BasicResult<StorageIndexDefinitionError, void> Storage::CreateIndex( utils::BasicResult<StorageIndexDefinitionError, void> InMemoryStorage::CreateIndex(
LabelId label, PropertyId property, const std::optional<uint64_t> desired_commit_timestamp) { LabelId label, PropertyId property, const std::optional<uint64_t> desired_commit_timestamp) {
std::unique_lock<utils::RWLock> storage_guard(main_lock_); std::unique_lock<utils::RWLock> storage_guard(main_lock_);
if (!indices_.label_property_index.CreateIndex(label, property, vertices_.access())) { if (!indices_.label_property_index.CreateIndex(label, property, vertices_.access())) {
@ -1209,7 +1035,7 @@ utils::BasicResult<StorageIndexDefinitionError, void> Storage::CreateIndex(
return StorageIndexDefinitionError{ReplicationError{}}; return StorageIndexDefinitionError{ReplicationError{}};
} }
utils::BasicResult<StorageIndexDefinitionError, void> Storage::DropIndex( utils::BasicResult<StorageIndexDefinitionError, void> InMemoryStorage::DropIndex(
LabelId label, const std::optional<uint64_t> desired_commit_timestamp) { LabelId label, const std::optional<uint64_t> desired_commit_timestamp) {
std::unique_lock<utils::RWLock> storage_guard(main_lock_); std::unique_lock<utils::RWLock> storage_guard(main_lock_);
if (!indices_.label_index.DropIndex(label)) { if (!indices_.label_index.DropIndex(label)) {
@ -1228,7 +1054,7 @@ utils::BasicResult<StorageIndexDefinitionError, void> Storage::DropIndex(
return StorageIndexDefinitionError{ReplicationError{}}; return StorageIndexDefinitionError{ReplicationError{}};
} }
utils::BasicResult<StorageIndexDefinitionError, void> Storage::DropIndex( utils::BasicResult<StorageIndexDefinitionError, void> InMemoryStorage::DropIndex(
LabelId label, PropertyId property, const std::optional<uint64_t> desired_commit_timestamp) { LabelId label, PropertyId property, const std::optional<uint64_t> desired_commit_timestamp) {
std::unique_lock<utils::RWLock> storage_guard(main_lock_); std::unique_lock<utils::RWLock> storage_guard(main_lock_);
if (!indices_.label_property_index.DropIndex(label, property)) { if (!indices_.label_property_index.DropIndex(label, property)) {
@ -1249,12 +1075,12 @@ utils::BasicResult<StorageIndexDefinitionError, void> Storage::DropIndex(
return StorageIndexDefinitionError{ReplicationError{}}; return StorageIndexDefinitionError{ReplicationError{}};
} }
IndicesInfo Storage::ListAllIndices() const { IndicesInfo InMemoryStorage::ListAllIndices() const {
std::shared_lock<utils::RWLock> storage_guard_(main_lock_); std::shared_lock<utils::RWLock> storage_guard_(main_lock_);
return {indices_.label_index.ListIndices(), indices_.label_property_index.ListIndices()}; return {indices_.label_index.ListIndices(), indices_.label_property_index.ListIndices()};
} }
utils::BasicResult<StorageExistenceConstraintDefinitionError, void> Storage::CreateExistenceConstraint( utils::BasicResult<StorageExistenceConstraintDefinitionError, void> InMemoryStorage::CreateExistenceConstraint(
LabelId label, PropertyId property, const std::optional<uint64_t> desired_commit_timestamp) { LabelId label, PropertyId property, const std::optional<uint64_t> desired_commit_timestamp) {
std::unique_lock<utils::RWLock> storage_guard(main_lock_); std::unique_lock<utils::RWLock> storage_guard(main_lock_);
auto ret = storage::CreateExistenceConstraint(&constraints_, label, property, vertices_.access()); auto ret = storage::CreateExistenceConstraint(&constraints_, label, property, vertices_.access());
@ -1278,7 +1104,7 @@ utils::BasicResult<StorageExistenceConstraintDefinitionError, void> Storage::Cre
return StorageExistenceConstraintDefinitionError{ReplicationError{}}; return StorageExistenceConstraintDefinitionError{ReplicationError{}};
} }
utils::BasicResult<StorageExistenceConstraintDroppingError, void> Storage::DropExistenceConstraint( utils::BasicResult<StorageExistenceConstraintDroppingError, void> InMemoryStorage::DropExistenceConstraint(
LabelId label, PropertyId property, const std::optional<uint64_t> desired_commit_timestamp) { LabelId label, PropertyId property, const std::optional<uint64_t> desired_commit_timestamp) {
std::unique_lock<utils::RWLock> storage_guard(main_lock_); std::unique_lock<utils::RWLock> storage_guard(main_lock_);
if (!storage::DropExistenceConstraint(&constraints_, label, property)) { if (!storage::DropExistenceConstraint(&constraints_, label, property)) {
@ -1298,8 +1124,8 @@ utils::BasicResult<StorageExistenceConstraintDroppingError, void> Storage::DropE
} }
utils::BasicResult<StorageUniqueConstraintDefinitionError, UniqueConstraints::CreationStatus> utils::BasicResult<StorageUniqueConstraintDefinitionError, UniqueConstraints::CreationStatus>
Storage::CreateUniqueConstraint(LabelId label, const std::set<PropertyId> &properties, InMemoryStorage::CreateUniqueConstraint(LabelId label, const std::set<PropertyId> &properties,
const std::optional<uint64_t> desired_commit_timestamp) { const std::optional<uint64_t> desired_commit_timestamp) {
std::unique_lock<utils::RWLock> storage_guard(main_lock_); std::unique_lock<utils::RWLock> storage_guard(main_lock_);
auto ret = constraints_.unique_constraints.CreateConstraint(label, properties, vertices_.access()); auto ret = constraints_.unique_constraints.CreateConstraint(label, properties, vertices_.access());
if (ret.HasError()) { if (ret.HasError()) {
@ -1322,8 +1148,8 @@ Storage::CreateUniqueConstraint(LabelId label, const std::set<PropertyId> &prope
} }
utils::BasicResult<StorageUniqueConstraintDroppingError, UniqueConstraints::DeletionStatus> utils::BasicResult<StorageUniqueConstraintDroppingError, UniqueConstraints::DeletionStatus>
Storage::DropUniqueConstraint(LabelId label, const std::set<PropertyId> &properties, InMemoryStorage::DropUniqueConstraint(LabelId label, const std::set<PropertyId> &properties,
const std::optional<uint64_t> desired_commit_timestamp) { const std::optional<uint64_t> desired_commit_timestamp) {
std::unique_lock<utils::RWLock> storage_guard(main_lock_); std::unique_lock<utils::RWLock> storage_guard(main_lock_);
auto ret = constraints_.unique_constraints.DropConstraint(label, properties); auto ret = constraints_.unique_constraints.DropConstraint(label, properties);
if (ret != UniqueConstraints::DeletionStatus::SUCCESS) { if (ret != UniqueConstraints::DeletionStatus::SUCCESS) {
@ -1342,12 +1168,12 @@ Storage::DropUniqueConstraint(LabelId label, const std::set<PropertyId> &propert
return StorageUniqueConstraintDroppingError{ReplicationError{}}; return StorageUniqueConstraintDroppingError{ReplicationError{}};
} }
ConstraintsInfo Storage::ListAllConstraints() const { ConstraintsInfo InMemoryStorage::ListAllConstraints() const {
std::shared_lock<utils::RWLock> storage_guard_(main_lock_); std::shared_lock<utils::RWLock> storage_guard_(main_lock_);
return {ListExistenceConstraints(constraints_), constraints_.unique_constraints.ListConstraints()}; return {ListExistenceConstraints(constraints_), constraints_.unique_constraints.ListConstraints()};
} }
StorageInfo Storage::GetInfo() const { StorageInfo InMemoryStorage::GetInfo() const {
auto vertex_count = vertices_.size(); auto vertex_count = vertices_.size();
auto edge_count = edge_count_.load(std::memory_order_acquire); auto edge_count = edge_count_.load(std::memory_order_acquire);
double average_degree = 0.0; double average_degree = 0.0;
@ -1358,29 +1184,29 @@ StorageInfo Storage::GetInfo() const {
utils::GetDirDiskUsage(config_.durability.storage_directory)}; utils::GetDirDiskUsage(config_.durability.storage_directory)};
} }
VerticesIterable Storage::Accessor::Vertices(LabelId label, View view) { VerticesIterable InMemoryStorage::InMemoryAccessor::Vertices(LabelId label, View view) {
return VerticesIterable(storage_->indices_.label_index.Vertices(label, view, &transaction_)); return VerticesIterable(storage_->indices_.label_index.Vertices(label, view, &transaction_));
} }
VerticesIterable Storage::Accessor::Vertices(LabelId label, PropertyId property, View view) { VerticesIterable InMemoryStorage::InMemoryAccessor::Vertices(LabelId label, PropertyId property, View view) {
return VerticesIterable(storage_->indices_.label_property_index.Vertices(label, property, std::nullopt, std::nullopt, return VerticesIterable(storage_->indices_.label_property_index.Vertices(label, property, std::nullopt, std::nullopt,
view, &transaction_)); view, &transaction_));
} }
VerticesIterable Storage::Accessor::Vertices(LabelId label, PropertyId property, const PropertyValue &value, VerticesIterable InMemoryStorage::InMemoryAccessor::Vertices(LabelId label, PropertyId property,
View view) { const PropertyValue &value, View view) {
return VerticesIterable(storage_->indices_.label_property_index.Vertices( return VerticesIterable(storage_->indices_.label_property_index.Vertices(
label, property, utils::MakeBoundInclusive(value), utils::MakeBoundInclusive(value), view, &transaction_)); label, property, utils::MakeBoundInclusive(value), utils::MakeBoundInclusive(value), view, &transaction_));
} }
VerticesIterable Storage::Accessor::Vertices(LabelId label, PropertyId property, VerticesIterable InMemoryStorage::InMemoryAccessor::Vertices(
const std::optional<utils::Bound<PropertyValue>> &lower_bound, LabelId label, PropertyId property, const std::optional<utils::Bound<PropertyValue>> &lower_bound,
const std::optional<utils::Bound<PropertyValue>> &upper_bound, View view) { const std::optional<utils::Bound<PropertyValue>> &upper_bound, View view) {
return VerticesIterable( return VerticesIterable(
storage_->indices_.label_property_index.Vertices(label, property, lower_bound, upper_bound, view, &transaction_)); storage_->indices_.label_property_index.Vertices(label, property, lower_bound, upper_bound, view, &transaction_));
} }
Transaction Storage::CreateTransaction(IsolationLevel isolation_level) { Transaction InMemoryStorage::CreateTransaction(IsolationLevel isolation_level) {
// We acquire the transaction engine lock here because we access (and // We acquire the transaction engine lock here because we access (and
// modify) the transaction engine variables (`transaction_id` and // modify) the transaction engine variables (`transaction_id` and
// `timestamp`) below. // `timestamp`) below.
@ -1405,7 +1231,7 @@ Transaction Storage::CreateTransaction(IsolationLevel isolation_level) {
} }
template <bool force> template <bool force>
void Storage::CollectGarbage() { void InMemoryStorage::CollectGarbage() {
if constexpr (force) { if constexpr (force) {
// We take the unique lock on the main storage lock so we can forcefully clean // We take the unique lock on the main storage lock so we can forcefully clean
// everything we can // everything we can
@ -1660,10 +1486,10 @@ void Storage::CollectGarbage() {
} }
// tell the linker he can find the CollectGarbage definitions here // tell the linker he can find the CollectGarbage definitions here
template void Storage::CollectGarbage<true>(); template void InMemoryStorage::CollectGarbage<true>();
template void Storage::CollectGarbage<false>(); template void InMemoryStorage::CollectGarbage<false>();
bool Storage::InitializeWalFile() { bool InMemoryStorage::InitializeWalFile() {
if (config_.durability.snapshot_wal_mode != Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL) if (config_.durability.snapshot_wal_mode != Config::Durability::SnapshotWalMode::PERIODIC_SNAPSHOT_WITH_WAL)
return false; return false;
if (!wal_file_) { if (!wal_file_) {
@ -1673,7 +1499,7 @@ bool Storage::InitializeWalFile() {
return true; return true;
} }
void Storage::FinalizeWalFile() { void InMemoryStorage::FinalizeWalFile() {
++wal_unsynced_transactions_; ++wal_unsynced_transactions_;
if (wal_unsynced_transactions_ >= config_.durability.wal_file_flush_every_n_tx) { if (wal_unsynced_transactions_ >= config_.durability.wal_file_flush_every_n_tx) {
wal_file_->Sync(); wal_file_->Sync();
@ -1692,7 +1518,7 @@ void Storage::FinalizeWalFile() {
} }
} }
bool Storage::AppendToWalDataManipulation(const Transaction &transaction, uint64_t final_commit_timestamp) { bool InMemoryStorage::AppendToWalDataManipulation(const Transaction &transaction, uint64_t final_commit_timestamp) {
if (!InitializeWalFile()) { if (!InitializeWalFile()) {
return true; return true;
} }
@ -1879,8 +1705,9 @@ bool Storage::AppendToWalDataManipulation(const Transaction &transaction, uint64
return finalized_on_all_replicas; return finalized_on_all_replicas;
} }
bool Storage::AppendToWalDataDefinition(durability::StorageGlobalOperation operation, LabelId label, bool InMemoryStorage::AppendToWalDataDefinition(durability::StorageGlobalOperation operation, LabelId label,
const std::set<PropertyId> &properties, uint64_t final_commit_timestamp) { const std::set<PropertyId> &properties,
uint64_t final_commit_timestamp) {
if (!InitializeWalFile()) { if (!InitializeWalFile()) {
return true; return true;
} }
@ -1907,7 +1734,7 @@ bool Storage::AppendToWalDataDefinition(durability::StorageGlobalOperation opera
return finalized_on_all_replicas; return finalized_on_all_replicas;
} }
utils::BasicResult<Storage::CreateSnapshotError> Storage::CreateSnapshot() { utils::BasicResult<InMemoryStorage::CreateSnapshotError> InMemoryStorage::CreateSnapshot() {
if (replication_role_.load() != ReplicationRole::MAIN) { if (replication_role_.load() != ReplicationRole::MAIN) {
return CreateSnapshotError::DisabledForReplica; return CreateSnapshotError::DisabledForReplica;
} }
@ -1931,12 +1758,12 @@ utils::BasicResult<Storage::CreateSnapshotError> Storage::CreateSnapshot() {
return {}; return {};
} }
bool Storage::LockPath() { bool InMemoryStorage::LockPath() {
auto locker_accessor = global_locker_.Access(); auto locker_accessor = global_locker_.Access();
return locker_accessor.AddPath(config_.durability.storage_directory); return locker_accessor.AddPath(config_.durability.storage_directory);
} }
bool Storage::UnlockPath() { bool InMemoryStorage::UnlockPath() {
{ {
auto locker_accessor = global_locker_.Access(); auto locker_accessor = global_locker_.Access();
if (!locker_accessor.RemovePath(config_.durability.storage_directory)) { if (!locker_accessor.RemovePath(config_.durability.storage_directory)) {
@ -1950,7 +1777,7 @@ bool Storage::UnlockPath() {
return true; return true;
} }
void Storage::FreeMemory() { void InMemoryStorage::FreeMemory() {
CollectGarbage<true>(); CollectGarbage<true>();
// SkipList is already threadsafe // SkipList is already threadsafe
@ -1960,7 +1787,7 @@ void Storage::FreeMemory() {
indices_.label_property_index.RunGC(); indices_.label_property_index.RunGC();
} }
uint64_t Storage::CommitTimestamp(const std::optional<uint64_t> desired_commit_timestamp) { uint64_t InMemoryStorage::CommitTimestamp(const std::optional<uint64_t> desired_commit_timestamp) {
if (!desired_commit_timestamp) { if (!desired_commit_timestamp) {
return timestamp_++; return timestamp_++;
} else { } else {
@ -1969,7 +1796,8 @@ uint64_t Storage::CommitTimestamp(const std::optional<uint64_t> desired_commit_t
} }
} }
bool Storage::SetReplicaRole(io::network::Endpoint endpoint, const replication::ReplicationServerConfig &config) { bool InMemoryStorage::SetReplicaRole(io::network::Endpoint endpoint,
const replication::ReplicationServerConfig &config) {
// We don't want to restart the server if we're already a REPLICA // We don't want to restart the server if we're already a REPLICA
if (replication_role_ == ReplicationRole::REPLICA) { if (replication_role_ == ReplicationRole::REPLICA) {
return false; return false;
@ -1981,7 +1809,7 @@ bool Storage::SetReplicaRole(io::network::Endpoint endpoint, const replication::
return true; return true;
} }
bool Storage::SetMainReplicationRole() { bool InMemoryStorage::SetMainReplicationRole() {
// We don't want to generate new epoch_id and do the // We don't want to generate new epoch_id and do the
// cleanup if we're already a MAIN // cleanup if we're already a MAIN
if (replication_role_ == ReplicationRole::MAIN) { if (replication_role_ == ReplicationRole::MAIN) {
@ -2011,7 +1839,7 @@ bool Storage::SetMainReplicationRole() {
return true; return true;
} }
utils::BasicResult<Storage::RegisterReplicaError> Storage::RegisterReplica( utils::BasicResult<InMemoryStorage::RegisterReplicaError> InMemoryStorage::RegisterReplica(
std::string name, io::network::Endpoint endpoint, const replication::ReplicationMode replication_mode, std::string name, io::network::Endpoint endpoint, const replication::ReplicationMode replication_mode,
const replication::RegistrationMode registration_mode, const replication::ReplicationClientConfig &config) { const replication::RegistrationMode registration_mode, const replication::ReplicationClientConfig &config) {
MG_ASSERT(replication_role_.load() == ReplicationRole::MAIN, "Only main instance can register a replica!"); MG_ASSERT(replication_role_.load() == ReplicationRole::MAIN, "Only main instance can register a replica!");
@ -2057,7 +1885,7 @@ utils::BasicResult<Storage::RegisterReplicaError> Storage::RegisterReplica(
spdlog::warn("Connection failed when registering replica {}. Replica will still be registered.", client->Name()); spdlog::warn("Connection failed when registering replica {}. Replica will still be registered.", client->Name());
} }
return replication_clients_.WithLock([&](auto &clients) -> utils::BasicResult<Storage::RegisterReplicaError> { return replication_clients_.WithLock([&](auto &clients) -> utils::BasicResult<InMemoryStorage::RegisterReplicaError> {
// Another thread could have added a client with same name while // Another thread could have added a client with same name while
// we were connecting to this client. // we were connecting to this client.
if (std::any_of(clients.begin(), clients.end(), if (std::any_of(clients.begin(), clients.end(),
@ -2075,7 +1903,7 @@ utils::BasicResult<Storage::RegisterReplicaError> Storage::RegisterReplica(
}); });
} }
bool Storage::UnregisterReplica(const std::string &name) { bool InMemoryStorage::UnregisterReplica(const std::string &name) {
MG_ASSERT(replication_role_.load() == ReplicationRole::MAIN, "Only main instance can unregister a replica!"); MG_ASSERT(replication_role_.load() == ReplicationRole::MAIN, "Only main instance can unregister a replica!");
if (ShouldStoreAndRestoreReplicas()) { if (ShouldStoreAndRestoreReplicas()) {
if (!storage_->Delete(name)) { if (!storage_->Delete(name)) {
@ -2089,7 +1917,7 @@ bool Storage::UnregisterReplica(const std::string &name) {
}); });
} }
std::optional<replication::ReplicaState> Storage::GetReplicaState(const std::string_view name) { std::optional<replication::ReplicaState> InMemoryStorage::GetReplicaState(const std::string_view name) {
return replication_clients_.WithLock([&](auto &clients) -> std::optional<replication::ReplicaState> { return replication_clients_.WithLock([&](auto &clients) -> std::optional<replication::ReplicaState> {
const auto client_it = const auto client_it =
std::find_if(clients.cbegin(), clients.cend(), [name](auto &client) { return client->Name() == name; }); std::find_if(clients.cbegin(), clients.cend(), [name](auto &client) { return client->Name() == name; });
@ -2100,11 +1928,11 @@ std::optional<replication::ReplicaState> Storage::GetReplicaState(const std::str
}); });
} }
ReplicationRole Storage::GetReplicationRole() const { return replication_role_; } ReplicationRole InMemoryStorage::GetReplicationRole() const { return replication_role_; }
std::vector<Storage::ReplicaInfo> Storage::ReplicasInfo() { std::vector<InMemoryStorage::ReplicaInfo> InMemoryStorage::ReplicasInfo() {
return replication_clients_.WithLock([](auto &clients) { return replication_clients_.WithLock([](auto &clients) {
std::vector<Storage::ReplicaInfo> replica_info; std::vector<InMemoryStorage::ReplicaInfo> replica_info;
replica_info.reserve(clients.size()); replica_info.reserve(clients.size());
std::transform( std::transform(
clients.begin(), clients.end(), std::back_inserter(replica_info), [](const auto &client) -> ReplicaInfo { clients.begin(), clients.end(), std::back_inserter(replica_info), [](const auto &client) -> ReplicaInfo {
@ -2114,12 +1942,12 @@ std::vector<Storage::ReplicaInfo> Storage::ReplicasInfo() {
}); });
} }
void Storage::SetIsolationLevel(IsolationLevel isolation_level) { void InMemoryStorage::SetIsolationLevel(IsolationLevel isolation_level) {
std::unique_lock main_guard{main_lock_}; std::unique_lock main_guard{main_lock_};
isolation_level_ = isolation_level; isolation_level_ = isolation_level;
} }
void Storage::RestoreReplicas() { void InMemoryStorage::RestoreReplicas() {
MG_ASSERT(memgraph::storage::ReplicationRole::MAIN == GetReplicationRole()); MG_ASSERT(memgraph::storage::ReplicationRole::MAIN == GetReplicationRole());
if (!ShouldStoreAndRestoreReplicas()) { if (!ShouldStoreAndRestoreReplicas()) {
return; return;
@ -2154,6 +1982,6 @@ void Storage::RestoreReplicas() {
} }
} }
bool Storage::ShouldStoreAndRestoreReplicas() const { return nullptr != storage_; } bool InMemoryStorage::ShouldStoreAndRestoreReplicas() const { return nullptr != storage_; }
} // namespace memgraph::storage } // namespace memgraph::storage

View File

@ -71,49 +71,6 @@ enum class ReplicationRole : uint8_t { MAIN, REPLICA };
// The paper implements a fully serializable storage, in our implementation we // The paper implements a fully serializable storage, in our implementation we
// only implement snapshot isolation for transactions. // only implement snapshot isolation for transactions.
/// Iterable for iterating through all vertices of a Storage.
///
/// An instance of this will be usually be wrapped inside VerticesIterable for
/// generic, public use.
class AllVerticesIterable final {
utils::SkipList<Vertex>::Accessor vertices_accessor_;
Transaction *transaction_;
View view_;
Indices *indices_;
Constraints *constraints_;
Config::Items config_;
VertexAccessor *vertex_;
public:
class Iterator final {
AllVerticesIterable *self_;
utils::SkipList<Vertex>::Iterator it_;
public:
Iterator(AllVerticesIterable *self, utils::SkipList<Vertex>::Iterator it);
VertexAccessor *operator*() const;
Iterator &operator++();
bool operator==(const Iterator &other) const { return self_ == other.self_ && it_ == other.it_; }
bool operator!=(const Iterator &other) const { return !(*this == other); }
};
AllVerticesIterable(utils::SkipList<Vertex>::Accessor vertices_accessor, Transaction *transaction, View view,
Indices *indices, Constraints *constraints, Config::Items config)
: vertices_accessor_(std::move(vertices_accessor)),
transaction_(transaction),
view_(view),
indices_(indices),
constraints_(constraints),
config_(config) {}
Iterator begin() { return Iterator(this, vertices_accessor_.begin()); }
Iterator end() { return Iterator(this, vertices_accessor_.end()); }
};
class InMemoryStorage final { class InMemoryStorage final {
public: public:
/// @throw std::system_error /// @throw std::system_error
@ -122,64 +79,64 @@ class InMemoryStorage final {
~InMemoryStorage(); ~InMemoryStorage();
class Accessor final { class InMemoryAccessor final : public Accessor {
private: private:
friend class InMemoryStorage; friend class InMemoryStorage;
explicit Accessor(InMemoryStorage *storage, IsolationLevel isolation_level); explicit InMemoryAccessor(InMemoryStorage *storage, IsolationLevel isolation_level);
public: public:
Accessor(const Accessor &) = delete; InMemoryAccessor(const InMemoryAccessor &) = delete;
Accessor &operator=(const Accessor &) = delete; InMemoryAccessor &operator=(const InMemoryAccessor &) = delete;
Accessor &operator=(Accessor &&other) = delete; InMemoryAccessor &operator=(InMemoryAccessor &&other) = delete;
// NOTE: After the accessor is moved, all objects derived from it (accessors // NOTE: After the accessor is moved, all objects derived from it (accessors
// and iterators) are *invalid*. You have to get all derived objects again. // and iterators) are *invalid*. You have to get all derived objects again.
Accessor(Accessor &&other) noexcept; InMemoryAccessor(InMemoryAccessor &&other) noexcept;
~Accessor(); ~InMemoryAccessor() override;
/// @throw std::bad_alloc /// @throw std::bad_alloc
VertexAccessor CreateVertex(); std::unique_ptr<VertexAccessor> CreateVertex() override;
std::optional<VertexAccessor> FindVertex(Gid gid, View view); std::unique_ptr<VertexAccessor> FindVertex(Gid gid, View view) override;
VerticesIterable Vertices(View view) { VerticesIterable Vertices(View view) override {
return VerticesIterable(AllVerticesIterable(storage_->vertices_.access(), &transaction_, view, return VerticesIterable(AllVerticesIterable(storage_->vertices_.access(), &transaction_, view,
&storage_->indices_, &storage_->constraints_, &storage_->indices_, &storage_->constraints_,
storage_->config_.items)); storage_->config_.items));
} }
VerticesIterable Vertices(LabelId label, View view); VerticesIterable Vertices(LabelId label, View view) override;
VerticesIterable Vertices(LabelId label, PropertyId property, View view); VerticesIterable Vertices(LabelId label, PropertyId property, View view) override;
VerticesIterable Vertices(LabelId label, PropertyId property, const PropertyValue &value, View view); VerticesIterable Vertices(LabelId label, PropertyId property, const PropertyValue &value, View view) override;
VerticesIterable Vertices(LabelId label, PropertyId property, VerticesIterable Vertices(LabelId label, PropertyId property,
const std::optional<utils::Bound<PropertyValue>> &lower_bound, const std::optional<utils::Bound<PropertyValue>> &lower_bound,
const std::optional<utils::Bound<PropertyValue>> &upper_bound, View view); const std::optional<utils::Bound<PropertyValue>> &upper_bound, View view) override;
/// Return approximate number of all vertices in the database. /// Return approximate number of all vertices in the database.
/// Note that this is always an over-estimate and never an under-estimate. /// Note that this is always an over-estimate and never an under-estimate.
int64_t ApproximateVertexCount() const { return storage_->vertices_.size(); } int64_t ApproximateVertexCount() const override { return storage_->vertices_.size(); }
/// Return approximate number of vertices with the given label. /// Return approximate number of vertices with the given label.
/// Note that this is always an over-estimate and never an under-estimate. /// Note that this is always an over-estimate and never an under-estimate.
int64_t ApproximateVertexCount(LabelId label) const { int64_t ApproximateVertexCount(LabelId label) const override {
return storage_->indices_.label_index.ApproximateVertexCount(label); return storage_->indices_.label_index.ApproximateVertexCount(label);
} }
/// Return approximate number of vertices with the given label and property. /// Return approximate number of vertices with the given label and property.
/// Note that this is always an over-estimate and never an under-estimate. /// Note that this is always an over-estimate and never an under-estimate.
int64_t ApproximateVertexCount(LabelId label, PropertyId property) const { int64_t ApproximateVertexCount(LabelId label, PropertyId property) const override {
return storage_->indices_.label_property_index.ApproximateVertexCount(label, property); return storage_->indices_.label_property_index.ApproximateVertexCount(label, property);
} }
/// Return approximate number of vertices with the given label and the given /// Return approximate number of vertices with the given label and the given
/// value for the given property. Note that this is always an over-estimate /// value for the given property. Note that this is always an over-estimate
/// and never an under-estimate. /// and never an under-estimate.
int64_t ApproximateVertexCount(LabelId label, PropertyId property, const PropertyValue &value) const { int64_t ApproximateVertexCount(LabelId label, PropertyId property, const PropertyValue &value) const override {
return storage_->indices_.label_property_index.ApproximateVertexCount(label, property, value); return storage_->indices_.label_property_index.ApproximateVertexCount(label, property, value);
} }
@ -188,20 +145,21 @@ class InMemoryStorage final {
/// bounds. /// bounds.
int64_t ApproximateVertexCount(LabelId label, PropertyId property, int64_t ApproximateVertexCount(LabelId label, PropertyId property,
const std::optional<utils::Bound<PropertyValue>> &lower, const std::optional<utils::Bound<PropertyValue>> &lower,
const std::optional<utils::Bound<PropertyValue>> &upper) const { const std::optional<utils::Bound<PropertyValue>> &upper) const override {
return storage_->indices_.label_property_index.ApproximateVertexCount(label, property, lower, upper); return storage_->indices_.label_property_index.ApproximateVertexCount(label, property, lower, upper);
} }
std::optional<storage::IndexStats> GetIndexStats(const storage::LabelId &label, std::optional<storage::IndexStats> GetIndexStats(const storage::LabelId &label,
const storage::PropertyId &property) const { const storage::PropertyId &property) const override {
return storage_->indices_.label_property_index.GetIndexStats(label, property); return storage_->indices_.label_property_index.GetIndexStats(label, property);
} }
std::vector<std::pair<LabelId, PropertyId>> ClearIndexStats() { std::vector<std::pair<LabelId, PropertyId>> ClearIndexStats() override {
return storage_->indices_.label_property_index.ClearIndexStats(); return storage_->indices_.label_property_index.ClearIndexStats();
} }
std::vector<std::pair<LabelId, PropertyId>> DeleteIndexStatsForLabels(const std::span<std::string> labels) { std::vector<std::pair<LabelId, PropertyId>> DeleteIndexStatsForLabels(
const std::span<std::string> labels) override {
std::vector<std::pair<LabelId, PropertyId>> deleted_indexes; std::vector<std::pair<LabelId, PropertyId>> deleted_indexes;
std::for_each(labels.begin(), labels.end(), [this, &deleted_indexes](const auto &label_str) { std::for_each(labels.begin(), labels.end(), [this, &deleted_indexes](const auto &label_str) {
std::vector<std::pair<LabelId, PropertyId>> loc_results = std::vector<std::pair<LabelId, PropertyId>> loc_results =
@ -212,55 +170,57 @@ class InMemoryStorage final {
return deleted_indexes; return deleted_indexes;
} }
void SetIndexStats(const storage::LabelId &label, const storage::PropertyId &property, const IndexStats &stats) { void SetIndexStats(const storage::LabelId &label, const storage::PropertyId &property,
const IndexStats &stats) override {
storage_->indices_.label_property_index.SetIndexStats(label, property, stats); storage_->indices_.label_property_index.SetIndexStats(label, property, stats);
} }
/// @return Accessor to the deleted vertex if a deletion took place, std::nullopt otherwise /// @return Accessor to the deleted vertex if a deletion took place, std::nullopt otherwise
/// @throw std::bad_alloc /// @throw std::bad_alloc
Result<std::optional<VertexAccessor>> DeleteVertex(VertexAccessor *vertex); Result<std::unique_ptr<VertexAccessor>> DeleteVertex(VertexAccessor *vertex) override;
/// @return Accessor to the deleted vertex and deleted edges if a deletion took place, std::nullopt otherwise /// @return Accessor to the deleted vertex and deleted edges if a deletion took place, std::nullopt otherwise
/// @throw std::bad_alloc /// @throw std::bad_alloc
Result<std::optional<std::pair<VertexAccessor, std::vector<EdgeAccessor>>>> DetachDeleteVertex( Result<std::optional<std::pair<std::unique_ptr<VertexAccessor>, std::vector<std::unique_ptr<EdgeAccessor>>>>>
VertexAccessor *vertex); DetachDeleteVertex(VertexAccessor *vertex) override;
/// @throw std::bad_alloc /// @throw std::bad_alloc
Result<EdgeAccessor> CreateEdge(VertexAccessor *from, VertexAccessor *to, EdgeTypeId edge_type); Result<std::unique_ptr<EdgeAccessor>> CreateEdge(VertexAccessor *from, VertexAccessor *to,
EdgeTypeId edge_type) override;
/// Accessor to the deleted edge if a deletion took place, std::nullopt otherwise /// Accessor to the deleted edge if a deletion took place, std::nullopt otherwise
/// @throw std::bad_alloc /// @throw std::bad_alloc
Result<std::optional<EdgeAccessor>> DeleteEdge(EdgeAccessor *edge); Result<std::unique_ptr<EdgeAccessor>> DeleteEdge(EdgeAccessor *edge) override;
const std::string &LabelToName(LabelId label) const; const std::string &LabelToName(LabelId label) const override;
const std::string &PropertyToName(PropertyId property) const; const std::string &PropertyToName(PropertyId property) const override;
const std::string &EdgeTypeToName(EdgeTypeId edge_type) const; const std::string &EdgeTypeToName(EdgeTypeId edge_type) const override;
/// @throw std::bad_alloc if unable to insert a new mapping /// @throw std::bad_alloc if unable to insert a new mapping
LabelId NameToLabel(std::string_view name); LabelId NameToLabel(std::string_view name) override;
/// @throw std::bad_alloc if unable to insert a new mapping /// @throw std::bad_alloc if unable to insert a new mapping
PropertyId NameToProperty(std::string_view name); PropertyId NameToProperty(std::string_view name) override;
/// @throw std::bad_alloc if unable to insert a new mapping /// @throw std::bad_alloc if unable to insert a new mapping
EdgeTypeId NameToEdgeType(std::string_view name); EdgeTypeId NameToEdgeType(std::string_view name) override;
bool LabelIndexExists(LabelId label) const { return storage_->indices_.label_index.IndexExists(label); } bool LabelIndexExists(LabelId label) const override { return storage_->indices_.label_index.IndexExists(label); }
bool LabelPropertyIndexExists(LabelId label, PropertyId property) const { bool LabelPropertyIndexExists(LabelId label, PropertyId property) const override {
return storage_->indices_.label_property_index.IndexExists(label, property); return storage_->indices_.label_property_index.IndexExists(label, property);
} }
IndicesInfo ListAllIndices() const { IndicesInfo ListAllIndices() const override {
return {storage_->indices_.label_index.ListIndices(), storage_->indices_.label_property_index.ListIndices()}; return {storage_->indices_.label_index.ListIndices(), storage_->indices_.label_property_index.ListIndices()};
} }
ConstraintsInfo ListAllConstraints() const { ConstraintsInfo ListAllConstraints() const override {
return {ListExistenceConstraints(storage_->constraints_), return {ListExistenceConstraints(storage_->constraints_),
storage_->constraints_.unique_constraints.ListConstraints()}; storage_->constraints_.unique_constraints.ListConstraints()};
} }
void AdvanceCommand(); void AdvanceCommand() override;
/// Returns void if the transaction has been committed. /// Returns void if the transaction has been committed.
/// Returns `StorageDataManipulationError` if an error occures. Error can be: /// Returns `StorageDataManipulationError` if an error occures. Error can be:
@ -269,21 +229,22 @@ class InMemoryStorage final {
/// case the transaction is automatically aborted. /// case the transaction is automatically aborted.
/// @throw std::bad_alloc /// @throw std::bad_alloc
utils::BasicResult<StorageDataManipulationError, void> Commit( utils::BasicResult<StorageDataManipulationError, void> Commit(
std::optional<uint64_t> desired_commit_timestamp = {}); std::optional<uint64_t> desired_commit_timestamp = {}) override;
/// @throw std::bad_alloc /// @throw std::bad_alloc
void Abort(); void Abort() override;
void FinalizeTransaction(); void FinalizeTransaction() override;
std::optional<uint64_t> GetTransactionId() const; std::optional<uint64_t> GetTransactionId() const override;
private: private:
/// @throw std::bad_alloc /// @throw std::bad_alloc
VertexAccessor CreateVertex(storage::Gid gid); std::unique_ptr<VertexAccessor> CreateVertex(storage::Gid gid);
/// @throw std::bad_alloc /// @throw std::bad_alloc
Result<EdgeAccessor> CreateEdge(VertexAccessor *from, VertexAccessor *to, EdgeTypeId edge_type, storage::Gid gid); Result<std::unique_ptr<EdgeAccessor>> CreateEdge(VertexAccessor *from, VertexAccessor *to, EdgeTypeId edge_type,
storage::Gid gid);
InMemoryStorage *storage_; InMemoryStorage *storage_;
std::shared_lock<utils::RWLock> storage_guard_; std::shared_lock<utils::RWLock> storage_guard_;
@ -293,8 +254,8 @@ class InMemoryStorage final {
Config::Items config_; Config::Items config_;
}; };
Accessor Access(std::optional<IsolationLevel> override_isolation_level = {}) { InMemoryAccessor Access(std::optional<IsolationLevel> override_isolation_level = {}) {
return Accessor{this, override_isolation_level.value_or(isolation_level_)}; return InMemoryAccessor{this, override_isolation_level.value_or(isolation_level_)};
} }
const std::string &LabelToName(LabelId label) const; const std::string &LabelToName(LabelId label) const;

View File

@ -16,6 +16,7 @@
#include "storage/v2/edge_accessor.hpp" #include "storage/v2/edge_accessor.hpp"
#include "storage/v2/id_types.hpp" #include "storage/v2/id_types.hpp"
#include "storage/v2/indices.hpp" #include "storage/v2/indices.hpp"
#include "storage/v2/inmemory/edge_accessor.hpp"
#include "storage/v2/mvcc.hpp" #include "storage/v2/mvcc.hpp"
#include "storage/v2/property_value.hpp" #include "storage/v2/property_value.hpp"
#include "utils/logging.hpp" #include "utils/logging.hpp"
@ -358,9 +359,11 @@ Result<std::map<PropertyId, PropertyValue>> InMemoryVertexAccessor::Properties(V
return std::move(properties); return std::move(properties);
} }
Result<std::vector<EdgeAccessor>> InMemoryVertexAccessor::InEdges(View view, const std::vector<EdgeTypeId> &edge_types, Result<std::vector<std::unique_ptr<EdgeAccessor>>> InMemoryVertexAccessor::InEdges(
const InMemoryVertexAccessor *destination) const { View view, const std::vector<EdgeTypeId> &edge_types, const VertexAccessor *destination) const {
MG_ASSERT(!destination || destination->transaction_ == transaction_, "Invalid accessor!"); auto *destVA = dynamic_cast<const InMemoryVertexAccessor *>(destination);
MG_ASSERT(destVA, "Target VertexAccessor must be from the same storage as the storage accessor!");
MG_ASSERT(!destination || destVA->transaction_ == transaction_, "Invalid accessor!");
bool exists = true; bool exists = true;
bool deleted = false; bool deleted = false;
std::vector<std::tuple<EdgeTypeId, Vertex *, EdgeRef>> in_edges; std::vector<std::tuple<EdgeTypeId, Vertex *, EdgeRef>> in_edges;
@ -368,12 +371,12 @@ Result<std::vector<EdgeAccessor>> InMemoryVertexAccessor::InEdges(View view, con
{ {
std::lock_guard<utils::SpinLock> guard(vertex_->lock); std::lock_guard<utils::SpinLock> guard(vertex_->lock);
deleted = vertex_->deleted; deleted = vertex_->deleted;
if (edge_types.empty() && !destination) { if (edge_types.empty() && !destVA) {
in_edges = vertex_->in_edges; in_edges = vertex_->in_edges;
} else { } else {
for (const auto &item : vertex_->in_edges) { for (const auto &item : vertex_->in_edges) {
const auto &[edge_type, from_vertex, edge] = item; const auto &[edge_type, from_vertex, edge] = item;
if (destination && from_vertex != destination->vertex_) continue; if (destVA && from_vertex != destVA->vertex_) continue;
if (!edge_types.empty() && std::find(edge_types.begin(), edge_types.end(), edge_type) == edge_types.end()) if (!edge_types.empty() && std::find(edge_types.begin(), edge_types.end(), edge_type) == edge_types.end())
continue; continue;
in_edges.push_back(item); in_edges.push_back(item);
@ -382,10 +385,10 @@ Result<std::vector<EdgeAccessor>> InMemoryVertexAccessor::InEdges(View view, con
delta = vertex_->delta; delta = vertex_->delta;
} }
ApplyDeltasForRead( ApplyDeltasForRead(
transaction_, delta, view, [&exists, &deleted, &in_edges, &edge_types, &destination](const Delta &delta) { transaction_, delta, view, [&exists, &deleted, &in_edges, &edge_types, &destVA](const Delta &delta) {
switch (delta.action) { switch (delta.action) {
case Delta::Action::ADD_IN_EDGE: { case Delta::Action::ADD_IN_EDGE: {
if (destination && delta.vertex_edge.vertex != destination->vertex_) break; if (destVA && delta.vertex_edge.vertex != destVA->vertex_) break;
if (!edge_types.empty() && if (!edge_types.empty() &&
std::find(edge_types.begin(), edge_types.end(), delta.vertex_edge.edge_type) == edge_types.end()) std::find(edge_types.begin(), edge_types.end(), delta.vertex_edge.edge_type) == edge_types.end())
break; break;
@ -398,7 +401,7 @@ Result<std::vector<EdgeAccessor>> InMemoryVertexAccessor::InEdges(View view, con
break; break;
} }
case Delta::Action::REMOVE_IN_EDGE: { case Delta::Action::REMOVE_IN_EDGE: {
if (destination && delta.vertex_edge.vertex != destination->vertex_) break; if (destVA && delta.vertex_edge.vertex != destVA->vertex_) break;
if (!edge_types.empty() && if (!edge_types.empty() &&
std::find(edge_types.begin(), edge_types.end(), delta.vertex_edge.edge_type) == edge_types.end()) std::find(edge_types.begin(), edge_types.end(), delta.vertex_edge.edge_type) == edge_types.end())
break; break;
@ -429,18 +432,21 @@ Result<std::vector<EdgeAccessor>> InMemoryVertexAccessor::InEdges(View view, con
}); });
if (!exists) return Error::NONEXISTENT_OBJECT; if (!exists) return Error::NONEXISTENT_OBJECT;
if (deleted) return Error::DELETED_OBJECT; if (deleted) return Error::DELETED_OBJECT;
std::vector<EdgeAccessor> ret; std::vector<std::unique_ptr<EdgeAccessor>> ret;
ret.reserve(in_edges.size()); ret.reserve(in_edges.size());
for (const auto &item : in_edges) { for (const auto &item : in_edges) {
const auto &[edge_type, from_vertex, edge] = item; const auto &[edge_type, from_vertex, edge] = item;
ret.emplace_back(edge, edge_type, from_vertex, vertex_, transaction_, indices_, constraints_, config_); ret.emplace_back(std::make_unique<InMemoryEdgeAccessor>(edge, edge_type, from_vertex, vertex_, transaction_,
indices_, constraints_, config_));
} }
return std::move(ret); return std::move(ret);
} }
Result<std::vector<EdgeAccessor>> InMemoryVertexAccessor::OutEdges(View view, const std::vector<EdgeTypeId> &edge_types, Result<std::vector<std::unique_ptr<EdgeAccessor>>> InMemoryVertexAccessor::OutEdges(
const InMemoryVertexAccessor *destination) const { View view, const std::vector<EdgeTypeId> &edge_types, const VertexAccessor *destination) const {
MG_ASSERT(!destination || destination->transaction_ == transaction_, "Invalid accessor!"); auto *destVA = dynamic_cast<const InMemoryVertexAccessor *>(destination);
MG_ASSERT(destVA, "Target VertexAccessor must be from the same storage as the storage accessor!");
MG_ASSERT(!destVA || destVA->transaction_ == transaction_, "Invalid accessor!");
bool exists = true; bool exists = true;
bool deleted = false; bool deleted = false;
std::vector<std::tuple<EdgeTypeId, Vertex *, EdgeRef>> out_edges; std::vector<std::tuple<EdgeTypeId, Vertex *, EdgeRef>> out_edges;
@ -448,12 +454,12 @@ Result<std::vector<EdgeAccessor>> InMemoryVertexAccessor::OutEdges(View view, co
{ {
std::lock_guard<utils::SpinLock> guard(vertex_->lock); std::lock_guard<utils::SpinLock> guard(vertex_->lock);
deleted = vertex_->deleted; deleted = vertex_->deleted;
if (edge_types.empty() && !destination) { if (edge_types.empty() && !destVA) {
out_edges = vertex_->out_edges; out_edges = vertex_->out_edges;
} else { } else {
for (const auto &item : vertex_->out_edges) { for (const auto &item : vertex_->out_edges) {
const auto &[edge_type, to_vertex, edge] = item; const auto &[edge_type, to_vertex, edge] = item;
if (destination && to_vertex != destination->vertex_) continue; if (destVA && to_vertex != destVA->vertex_) continue;
if (!edge_types.empty() && std::find(edge_types.begin(), edge_types.end(), edge_type) == edge_types.end()) if (!edge_types.empty() && std::find(edge_types.begin(), edge_types.end(), edge_type) == edge_types.end())
continue; continue;
out_edges.push_back(item); out_edges.push_back(item);
@ -462,10 +468,10 @@ Result<std::vector<EdgeAccessor>> InMemoryVertexAccessor::OutEdges(View view, co
delta = vertex_->delta; delta = vertex_->delta;
} }
ApplyDeltasForRead( ApplyDeltasForRead(
transaction_, delta, view, [&exists, &deleted, &out_edges, &edge_types, &destination](const Delta &delta) { transaction_, delta, view, [&exists, &deleted, &out_edges, &edge_types, &destVA](const Delta &delta) {
switch (delta.action) { switch (delta.action) {
case Delta::Action::ADD_OUT_EDGE: { case Delta::Action::ADD_OUT_EDGE: {
if (destination && delta.vertex_edge.vertex != destination->vertex_) break; if (destVA && delta.vertex_edge.vertex != destVA->vertex_) break;
if (!edge_types.empty() && if (!edge_types.empty() &&
std::find(edge_types.begin(), edge_types.end(), delta.vertex_edge.edge_type) == edge_types.end()) std::find(edge_types.begin(), edge_types.end(), delta.vertex_edge.edge_type) == edge_types.end())
break; break;
@ -478,7 +484,7 @@ Result<std::vector<EdgeAccessor>> InMemoryVertexAccessor::OutEdges(View view, co
break; break;
} }
case Delta::Action::REMOVE_OUT_EDGE: { case Delta::Action::REMOVE_OUT_EDGE: {
if (destination && delta.vertex_edge.vertex != destination->vertex_) break; if (destVA && delta.vertex_edge.vertex != destVA->vertex_) break;
if (!edge_types.empty() && if (!edge_types.empty() &&
std::find(edge_types.begin(), edge_types.end(), delta.vertex_edge.edge_type) == edge_types.end()) std::find(edge_types.begin(), edge_types.end(), delta.vertex_edge.edge_type) == edge_types.end())
break; break;
@ -509,11 +515,12 @@ Result<std::vector<EdgeAccessor>> InMemoryVertexAccessor::OutEdges(View view, co
}); });
if (!exists) return Error::NONEXISTENT_OBJECT; if (!exists) return Error::NONEXISTENT_OBJECT;
if (deleted) return Error::DELETED_OBJECT; if (deleted) return Error::DELETED_OBJECT;
std::vector<EdgeAccessor> ret; std::vector<std::unique_ptr<EdgeAccessor>> ret;
ret.reserve(out_edges.size()); ret.reserve(out_edges.size());
for (const auto &item : out_edges) { for (const auto &item : out_edges) {
const auto &[edge_type, to_vertex, edge] = item; const auto &[edge_type, to_vertex, edge] = item;
ret.emplace_back(edge, edge_type, vertex_, to_vertex, transaction_, indices_, constraints_, config_); ret.emplace_back(std::make_unique<InMemoryEdgeAccessor>(edge, edge_type, vertex_, to_vertex, transaction_, indices_,
constraints_, config_));
} }
return std::move(ret); return std::move(ret);
} }

View File

@ -28,9 +28,9 @@ class Storage;
struct Indices; struct Indices;
struct Constraints; struct Constraints;
class InMemoryVertexAccessor : public VertexAccessor { class InMemoryVertexAccessor final : public VertexAccessor {
private: private:
friend class Storage; friend class InMemoryStorage;
public: public:
InMemoryVertexAccessor(Vertex *vertex, Transaction *transaction, Indices *indices, Constraints *constraints, InMemoryVertexAccessor(Vertex *vertex, Transaction *transaction, Indices *indices, Constraints *constraints,
@ -85,22 +85,20 @@ class InMemoryVertexAccessor : public VertexAccessor {
/// @throw std::bad_alloc /// @throw std::bad_alloc
/// @throw std::length_error if the resulting vector exceeds /// @throw std::length_error if the resulting vector exceeds
/// std::vector::max_size(). /// std::vector::max_size().
Result<std::vector<EdgeAccessor>> InEdges(View view, const std::vector<EdgeTypeId> &edge_types, Result<std::vector<std::unique_ptr<EdgeAccessor>>> InEdges(View view, const std::vector<EdgeTypeId> &edge_types,
const VertexAccessor *destination) const override; const VertexAccessor *destination) const override;
/// @throw std::bad_alloc /// @throw std::bad_alloc
/// @throw std::length_error if the resulting vector exceeds /// @throw std::length_error if the resulting vector exceeds
/// std::vector::max_size(). /// std::vector::max_size().
Result<std::vector<EdgeAccessor>> OutEdges(View view, const std::vector<EdgeTypeId> &edge_types, Result<std::vector<std::unique_ptr<EdgeAccessor>>> OutEdges(View view, const std::vector<EdgeTypeId> &edge_types,
const VertexAccessor *destination) const override; const VertexAccessor *destination) const override;
Result<size_t> InDegree(View view) const override; Result<size_t> InDegree(View view) const override;
Result<size_t> OutDegree(View view) const override; Result<size_t> OutDegree(View view) const override;
class Gid Gid() const noexcept override { storage::Gid Gid() const noexcept override { return vertex_->gid; }
return vertex_->gid;
}
bool operator==(const VertexAccessor &other) const noexcept override { bool operator==(const VertexAccessor &other) const noexcept override {
const auto *otherVertex = dynamic_cast<const InMemoryVertexAccessor *>(&other); const auto *otherVertex = dynamic_cast<const InMemoryVertexAccessor *>(&other);

View File

@ -30,9 +30,10 @@ template <typename>
} // namespace } // namespace
////// ReplicationClient ////// ////// ReplicationClient //////
Storage::ReplicationClient::ReplicationClient(std::string name, Storage *storage, const io::network::Endpoint &endpoint, InMemoryStorage::ReplicationClient::ReplicationClient(std::string name, InMemoryStorage *storage,
const replication::ReplicationMode mode, const io::network::Endpoint &endpoint,
const replication::ReplicationClientConfig &config) const replication::ReplicationMode mode,
const replication::ReplicationClientConfig &config)
: name_(std::move(name)), storage_(storage), mode_(mode) { : name_(std::move(name)), storage_(storage), mode_(mode) {
if (config.ssl) { if (config.ssl) {
rpc_context_.emplace(config.ssl->key_file, config.ssl->cert_file); rpc_context_.emplace(config.ssl->key_file, config.ssl->cert_file);
@ -49,14 +50,14 @@ Storage::ReplicationClient::ReplicationClient(std::string name, Storage *storage
} }
} }
void Storage::ReplicationClient::TryInitializeClientAsync() { void InMemoryStorage::ReplicationClient::TryInitializeClientAsync() {
thread_pool_.AddTask([this] { thread_pool_.AddTask([this] {
rpc_client_->Abort(); rpc_client_->Abort();
this->TryInitializeClientSync(); this->TryInitializeClientSync();
}); });
} }
void Storage::ReplicationClient::FrequentCheck() { void InMemoryStorage::ReplicationClient::FrequentCheck() {
const auto is_success = std::invoke([this]() { const auto is_success = std::invoke([this]() {
try { try {
auto stream{rpc_client_->Stream<replication::FrequentHeartbeatRpc>()}; auto stream{rpc_client_->Stream<replication::FrequentHeartbeatRpc>()};
@ -82,7 +83,7 @@ void Storage::ReplicationClient::FrequentCheck() {
} }
/// @throws rpc::RpcFailedException /// @throws rpc::RpcFailedException
void Storage::ReplicationClient::InitializeClient() { void InMemoryStorage::ReplicationClient::InitializeClient() {
uint64_t current_commit_timestamp{kTimestampInitialId}; uint64_t current_commit_timestamp{kTimestampInitialId};
std::optional<std::string> epoch_id; std::optional<std::string> epoch_id;
@ -134,7 +135,7 @@ void Storage::ReplicationClient::InitializeClient() {
} }
} }
void Storage::ReplicationClient::TryInitializeClientSync() { void InMemoryStorage::ReplicationClient::TryInitializeClientSync() {
try { try {
InitializeClient(); InitializeClient();
} catch (const rpc::RpcFailedException &) { } catch (const rpc::RpcFailedException &) {
@ -145,19 +146,19 @@ void Storage::ReplicationClient::TryInitializeClientSync() {
} }
} }
void Storage::ReplicationClient::HandleRpcFailure() { void InMemoryStorage::ReplicationClient::HandleRpcFailure() {
spdlog::error(utils::MessageWithLink("Couldn't replicate data to {}.", name_, "https://memgr.ph/replication")); spdlog::error(utils::MessageWithLink("Couldn't replicate data to {}.", name_, "https://memgr.ph/replication"));
TryInitializeClientAsync(); TryInitializeClientAsync();
} }
replication::SnapshotRes Storage::ReplicationClient::TransferSnapshot(const std::filesystem::path &path) { replication::SnapshotRes InMemoryStorage::ReplicationClient::TransferSnapshot(const std::filesystem::path &path) {
auto stream{rpc_client_->Stream<replication::SnapshotRpc>()}; auto stream{rpc_client_->Stream<replication::SnapshotRpc>()};
replication::Encoder encoder(stream.GetBuilder()); replication::Encoder encoder(stream.GetBuilder());
encoder.WriteFile(path); encoder.WriteFile(path);
return stream.AwaitResponse(); return stream.AwaitResponse();
} }
replication::WalFilesRes Storage::ReplicationClient::TransferWalFiles( replication::WalFilesRes InMemoryStorage::ReplicationClient::TransferWalFiles(
const std::vector<std::filesystem::path> &wal_files) { const std::vector<std::filesystem::path> &wal_files) {
MG_ASSERT(!wal_files.empty(), "Wal files list is empty!"); MG_ASSERT(!wal_files.empty(), "Wal files list is empty!");
auto stream{rpc_client_->Stream<replication::WalFilesRpc>(wal_files.size())}; auto stream{rpc_client_->Stream<replication::WalFilesRpc>(wal_files.size())};
@ -170,7 +171,7 @@ replication::WalFilesRes Storage::ReplicationClient::TransferWalFiles(
return stream.AwaitResponse(); return stream.AwaitResponse();
} }
void Storage::ReplicationClient::StartTransactionReplication(const uint64_t current_wal_seq_num) { void InMemoryStorage::ReplicationClient::StartTransactionReplication(const uint64_t current_wal_seq_num) {
std::unique_lock guard(client_lock_); std::unique_lock guard(client_lock_);
const auto status = replica_state_.load(); const auto status = replica_state_.load();
switch (status) { switch (status) {
@ -204,7 +205,8 @@ void Storage::ReplicationClient::StartTransactionReplication(const uint64_t curr
} }
} }
void Storage::ReplicationClient::IfStreamingTransaction(const std::function<void(ReplicaStream &handler)> &callback) { void InMemoryStorage::ReplicationClient::IfStreamingTransaction(
const std::function<void(ReplicaStream &handler)> &callback) {
// We can only check the state because it guarantees to be only // We can only check the state because it guarantees to be only
// valid during a single transaction replication (if the assumption // valid during a single transaction replication (if the assumption
// that this and other transaction replication functions can only be // that this and other transaction replication functions can only be
@ -224,7 +226,7 @@ void Storage::ReplicationClient::IfStreamingTransaction(const std::function<void
} }
} }
bool Storage::ReplicationClient::FinalizeTransactionReplication() { bool InMemoryStorage::ReplicationClient::FinalizeTransactionReplication() {
// We can only check the state because it guarantees to be only // We can only check the state because it guarantees to be only
// valid during a single transaction replication (if the assumption // valid during a single transaction replication (if the assumption
// that this and other transaction replication functions can only be // that this and other transaction replication functions can only be
@ -241,7 +243,7 @@ bool Storage::ReplicationClient::FinalizeTransactionReplication() {
} }
} }
bool Storage::ReplicationClient::FinalizeTransactionReplicationInternal() { bool InMemoryStorage::ReplicationClient::FinalizeTransactionReplicationInternal() {
MG_ASSERT(replica_stream_, "Missing stream for transaction deltas"); MG_ASSERT(replica_stream_, "Missing stream for transaction deltas");
try { try {
auto response = replica_stream_->Finalize(); auto response = replica_stream_->Finalize();
@ -265,7 +267,7 @@ bool Storage::ReplicationClient::FinalizeTransactionReplicationInternal() {
return false; return false;
} }
void Storage::ReplicationClient::RecoverReplica(uint64_t replica_commit) { void InMemoryStorage::ReplicationClient::RecoverReplica(uint64_t replica_commit) {
while (true) { while (true) {
auto file_locker = storage_->file_retainer_.AddLocker(); auto file_locker = storage_->file_retainer_.AddLocker();
@ -327,7 +329,7 @@ void Storage::ReplicationClient::RecoverReplica(uint64_t replica_commit) {
} }
} }
uint64_t Storage::ReplicationClient::ReplicateCurrentWal() { uint64_t InMemoryStorage::ReplicationClient::ReplicateCurrentWal() {
const auto &wal_file = storage_->wal_file_; const auto &wal_file = storage_->wal_file_;
auto stream = TransferCurrentWalFile(); auto stream = TransferCurrentWalFile();
stream.AppendFilename(wal_file->Path().filename()); stream.AppendFilename(wal_file->Path().filename());
@ -361,7 +363,7 @@ uint64_t Storage::ReplicationClient::ReplicateCurrentWal() {
/// recovery steps, so we can safely send it to the replica. /// recovery steps, so we can safely send it to the replica.
/// We assume that the property of preserving at least 1 WAL before the snapshot /// We assume that the property of preserving at least 1 WAL before the snapshot
/// is satisfied as we extract the timestamp information from it. /// is satisfied as we extract the timestamp information from it.
std::vector<Storage::ReplicationClient::RecoveryStep> Storage::ReplicationClient::GetRecoverySteps( std::vector<InMemoryStorage::ReplicationClient::RecoveryStep> InMemoryStorage::ReplicationClient::GetRecoverySteps(
const uint64_t replica_commit, utils::FileRetainer::FileLocker *file_locker) { const uint64_t replica_commit, utils::FileRetainer::FileLocker *file_locker) {
// First check if we can recover using the current wal file only // First check if we can recover using the current wal file only
// otherwise save the seq_num of the current wal file // otherwise save the seq_num of the current wal file
@ -502,8 +504,8 @@ std::vector<Storage::ReplicationClient::RecoveryStep> Storage::ReplicationClient
return recovery_steps; return recovery_steps;
} }
Storage::TimestampInfo Storage::ReplicationClient::GetTimestampInfo() { InMemoryStorage::TimestampInfo InMemoryStorage::ReplicationClient::GetTimestampInfo() {
Storage::TimestampInfo info; InMemoryStorage::TimestampInfo info;
info.current_timestamp_of_replica = 0; info.current_timestamp_of_replica = 0;
info.current_number_of_timestamp_behind_master = 0; info.current_number_of_timestamp_behind_master = 0;
@ -531,65 +533,71 @@ Storage::TimestampInfo Storage::ReplicationClient::GetTimestampInfo() {
} }
////// ReplicaStream ////// ////// ReplicaStream //////
Storage::ReplicationClient::ReplicaStream::ReplicaStream(ReplicationClient *self, InMemoryStorage::ReplicationClient::ReplicaStream::ReplicaStream(ReplicationClient *self,
const uint64_t previous_commit_timestamp, const uint64_t previous_commit_timestamp,
const uint64_t current_seq_num) const uint64_t current_seq_num)
: self_(self), : self_(self),
stream_(self_->rpc_client_->Stream<replication::AppendDeltasRpc>(previous_commit_timestamp, current_seq_num)) { stream_(self_->rpc_client_->Stream<replication::AppendDeltasRpc>(previous_commit_timestamp, current_seq_num)) {
replication::Encoder encoder{stream_.GetBuilder()}; replication::Encoder encoder{stream_.GetBuilder()};
encoder.WriteString(self_->storage_->epoch_id_); encoder.WriteString(self_->storage_->epoch_id_);
} }
void Storage::ReplicationClient::ReplicaStream::AppendDelta(const Delta &delta, const Vertex &vertex, void InMemoryStorage::ReplicationClient::ReplicaStream::AppendDelta(const Delta &delta, const Vertex &vertex,
uint64_t final_commit_timestamp) { uint64_t final_commit_timestamp) {
replication::Encoder encoder(stream_.GetBuilder()); replication::Encoder encoder(stream_.GetBuilder());
EncodeDelta(&encoder, &self_->storage_->name_id_mapper_, self_->storage_->config_.items, delta, vertex, EncodeDelta(&encoder, &self_->storage_->name_id_mapper_, self_->storage_->config_.items, delta, vertex,
final_commit_timestamp); final_commit_timestamp);
} }
void Storage::ReplicationClient::ReplicaStream::AppendDelta(const Delta &delta, const Edge &edge, void InMemoryStorage::ReplicationClient::ReplicaStream::AppendDelta(const Delta &delta, const Edge &edge,
uint64_t final_commit_timestamp) { uint64_t final_commit_timestamp) {
replication::Encoder encoder(stream_.GetBuilder()); replication::Encoder encoder(stream_.GetBuilder());
EncodeDelta(&encoder, &self_->storage_->name_id_mapper_, delta, edge, final_commit_timestamp); EncodeDelta(&encoder, &self_->storage_->name_id_mapper_, delta, edge, final_commit_timestamp);
} }
void Storage::ReplicationClient::ReplicaStream::AppendTransactionEnd(uint64_t final_commit_timestamp) { void InMemoryStorage::ReplicationClient::ReplicaStream::AppendTransactionEnd(uint64_t final_commit_timestamp) {
replication::Encoder encoder(stream_.GetBuilder()); replication::Encoder encoder(stream_.GetBuilder());
EncodeTransactionEnd(&encoder, final_commit_timestamp); EncodeTransactionEnd(&encoder, final_commit_timestamp);
} }
void Storage::ReplicationClient::ReplicaStream::AppendOperation(durability::StorageGlobalOperation operation, void InMemoryStorage::ReplicationClient::ReplicaStream::AppendOperation(durability::StorageGlobalOperation operation,
LabelId label, const std::set<PropertyId> &properties, LabelId label,
uint64_t timestamp) { const std::set<PropertyId> &properties,
uint64_t timestamp) {
replication::Encoder encoder(stream_.GetBuilder()); replication::Encoder encoder(stream_.GetBuilder());
EncodeOperation(&encoder, &self_->storage_->name_id_mapper_, operation, label, properties, timestamp); EncodeOperation(&encoder, &self_->storage_->name_id_mapper_, operation, label, properties, timestamp);
} }
replication::AppendDeltasRes Storage::ReplicationClient::ReplicaStream::Finalize() { return stream_.AwaitResponse(); } replication::AppendDeltasRes InMemoryStorage::ReplicationClient::ReplicaStream::Finalize() {
return stream_.AwaitResponse();
}
////// CurrentWalHandler ////// ////// CurrentWalHandler //////
Storage::ReplicationClient::CurrentWalHandler::CurrentWalHandler(ReplicationClient *self) InMemoryStorage::ReplicationClient::CurrentWalHandler::CurrentWalHandler(ReplicationClient *self)
: self_(self), stream_(self_->rpc_client_->Stream<replication::CurrentWalRpc>()) {} : self_(self), stream_(self_->rpc_client_->Stream<replication::CurrentWalRpc>()) {}
void Storage::ReplicationClient::CurrentWalHandler::AppendFilename(const std::string &filename) { void InMemoryStorage::ReplicationClient::CurrentWalHandler::AppendFilename(const std::string &filename) {
replication::Encoder encoder(stream_.GetBuilder()); replication::Encoder encoder(stream_.GetBuilder());
encoder.WriteString(filename); encoder.WriteString(filename);
} }
void Storage::ReplicationClient::CurrentWalHandler::AppendSize(const size_t size) { void InMemoryStorage::ReplicationClient::CurrentWalHandler::AppendSize(const size_t size) {
replication::Encoder encoder(stream_.GetBuilder()); replication::Encoder encoder(stream_.GetBuilder());
encoder.WriteUint(size); encoder.WriteUint(size);
} }
void Storage::ReplicationClient::CurrentWalHandler::AppendFileData(utils::InputFile *file) { void InMemoryStorage::ReplicationClient::CurrentWalHandler::AppendFileData(utils::InputFile *file) {
replication::Encoder encoder(stream_.GetBuilder()); replication::Encoder encoder(stream_.GetBuilder());
encoder.WriteFileData(file); encoder.WriteFileData(file);
} }
void Storage::ReplicationClient::CurrentWalHandler::AppendBufferData(const uint8_t *buffer, const size_t buffer_size) { void InMemoryStorage::ReplicationClient::CurrentWalHandler::AppendBufferData(const uint8_t *buffer,
const size_t buffer_size) {
replication::Encoder encoder(stream_.GetBuilder()); replication::Encoder encoder(stream_.GetBuilder());
encoder.WriteBuffer(buffer, buffer_size); encoder.WriteBuffer(buffer, buffer_size);
} }
replication::CurrentWalRes Storage::ReplicationClient::CurrentWalHandler::Finalize() { return stream_.AwaitResponse(); } replication::CurrentWalRes InMemoryStorage::ReplicationClient::CurrentWalHandler::Finalize() {
return stream_.AwaitResponse();
}
} // namespace memgraph::storage } // namespace memgraph::storage

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd. // Copyright 2023 Memgraph Ltd.
// //
// Use of this software is governed by the Business Source License // Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -19,6 +19,7 @@
#include "storage/v2/durability/snapshot.hpp" #include "storage/v2/durability/snapshot.hpp"
#include "storage/v2/durability/version.hpp" #include "storage/v2/durability/version.hpp"
#include "storage/v2/durability/wal.hpp" #include "storage/v2/durability/wal.hpp"
#include "storage/v2/inmemory/edge_accessor.hpp"
#include "storage/v2/replication/config.hpp" #include "storage/v2/replication/config.hpp"
#include "storage/v2/transaction.hpp" #include "storage/v2/transaction.hpp"
#include "utils/exceptions.hpp" #include "utils/exceptions.hpp"
@ -39,8 +40,8 @@ std::pair<uint64_t, durability::WalDeltaData> ReadDelta(durability::BaseDecoder
}; };
} // namespace } // namespace
Storage::ReplicationServer::ReplicationServer(Storage *storage, io::network::Endpoint endpoint, InMemoryStorage::ReplicationServer::ReplicationServer(InMemoryStorage *storage, io::network::Endpoint endpoint,
const replication::ReplicationServerConfig &config) const replication::ReplicationServerConfig &config)
: storage_(storage) { : storage_(storage) {
// Create RPC server. // Create RPC server.
if (config.ssl) { if (config.ssl) {
@ -87,21 +88,21 @@ Storage::ReplicationServer::ReplicationServer(Storage *storage, io::network::End
rpc_server_->Start(); rpc_server_->Start();
} }
void Storage::ReplicationServer::HeartbeatHandler(slk::Reader *req_reader, slk::Builder *res_builder) { void InMemoryStorage::ReplicationServer::HeartbeatHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
replication::HeartbeatReq req; replication::HeartbeatReq req;
slk::Load(&req, req_reader); slk::Load(&req, req_reader);
replication::HeartbeatRes res{true, storage_->last_commit_timestamp_.load(), storage_->epoch_id_}; replication::HeartbeatRes res{true, storage_->last_commit_timestamp_.load(), storage_->epoch_id_};
slk::Save(res, res_builder); slk::Save(res, res_builder);
} }
void Storage::ReplicationServer::FrequentHeartbeatHandler(slk::Reader *req_reader, slk::Builder *res_builder) { void InMemoryStorage::ReplicationServer::FrequentHeartbeatHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
replication::FrequentHeartbeatReq req; replication::FrequentHeartbeatReq req;
slk::Load(&req, req_reader); slk::Load(&req, req_reader);
replication::FrequentHeartbeatRes res{true}; replication::FrequentHeartbeatRes res{true};
slk::Save(res, res_builder); slk::Save(res, res_builder);
} }
void Storage::ReplicationServer::AppendDeltasHandler(slk::Reader *req_reader, slk::Builder *res_builder) { void InMemoryStorage::ReplicationServer::AppendDeltasHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
replication::AppendDeltasReq req; replication::AppendDeltasReq req;
slk::Load(&req, req_reader); slk::Load(&req, req_reader);
@ -148,7 +149,7 @@ void Storage::ReplicationServer::AppendDeltasHandler(slk::Reader *req_reader, sl
slk::Save(res, res_builder); slk::Save(res, res_builder);
} }
void Storage::ReplicationServer::SnapshotHandler(slk::Reader *req_reader, slk::Builder *res_builder) { void InMemoryStorage::ReplicationServer::SnapshotHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
replication::SnapshotReq req; replication::SnapshotReq req;
slk::Load(&req, req_reader); slk::Load(&req, req_reader);
@ -212,7 +213,7 @@ void Storage::ReplicationServer::SnapshotHandler(slk::Reader *req_reader, slk::B
} }
} }
void Storage::ReplicationServer::WalFilesHandler(slk::Reader *req_reader, slk::Builder *res_builder) { void InMemoryStorage::ReplicationServer::WalFilesHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
replication::WalFilesReq req; replication::WalFilesReq req;
slk::Load(&req, req_reader); slk::Load(&req, req_reader);
@ -231,7 +232,7 @@ void Storage::ReplicationServer::WalFilesHandler(slk::Reader *req_reader, slk::B
slk::Save(res, res_builder); slk::Save(res, res_builder);
} }
void Storage::ReplicationServer::CurrentWalHandler(slk::Reader *req_reader, slk::Builder *res_builder) { void InMemoryStorage::ReplicationServer::CurrentWalHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
replication::CurrentWalReq req; replication::CurrentWalReq req;
slk::Load(&req, req_reader); slk::Load(&req, req_reader);
@ -245,7 +246,7 @@ void Storage::ReplicationServer::CurrentWalHandler(slk::Reader *req_reader, slk:
slk::Save(res, res_builder); slk::Save(res, res_builder);
} }
void Storage::ReplicationServer::LoadWal(replication::Decoder *decoder) { void InMemoryStorage::ReplicationServer::LoadWal(replication::Decoder *decoder) {
const auto temp_wal_directory = std::filesystem::temp_directory_path() / "memgraph" / durability::kWalDirectory; const auto temp_wal_directory = std::filesystem::temp_directory_path() / "memgraph" / durability::kWalDirectory;
utils::EnsureDir(temp_wal_directory); utils::EnsureDir(temp_wal_directory);
auto maybe_wal_path = decoder->ReadFile(temp_wal_directory); auto maybe_wal_path = decoder->ReadFile(temp_wal_directory);
@ -288,7 +289,7 @@ void Storage::ReplicationServer::LoadWal(replication::Decoder *decoder) {
} }
} }
void Storage::ReplicationServer::TimestampHandler(slk::Reader *req_reader, slk::Builder *res_builder) { void InMemoryStorage::ReplicationServer::TimestampHandler(slk::Reader *req_reader, slk::Builder *res_builder) {
replication::TimestampReq req; replication::TimestampReq req;
slk::Load(&req, req_reader); slk::Load(&req, req_reader);
@ -296,17 +297,17 @@ void Storage::ReplicationServer::TimestampHandler(slk::Reader *req_reader, slk::
slk::Save(res, res_builder); slk::Save(res, res_builder);
} }
Storage::ReplicationServer::~ReplicationServer() { InMemoryStorage::ReplicationServer::~ReplicationServer() {
if (rpc_server_) { if (rpc_server_) {
rpc_server_->Shutdown(); rpc_server_->Shutdown();
rpc_server_->AwaitShutdown(); rpc_server_->AwaitShutdown();
} }
} }
uint64_t Storage::ReplicationServer::ReadAndApplyDelta(durability::BaseDecoder *decoder) { uint64_t InMemoryStorage::ReplicationServer::ReadAndApplyDelta(durability::BaseDecoder *decoder) {
auto edge_acc = storage_->edges_.access(); auto edge_acc = storage_->edges_.access();
auto vertex_acc = storage_->vertices_.access(); auto vertex_acc = storage_->vertices_.access();
std::optional<std::pair<uint64_t, storage::Storage::Accessor>> commit_timestamp_and_accessor; std::optional<std::pair<uint64_t, storage::InMemoryStorage::InMemoryAccessor>> commit_timestamp_and_accessor;
auto get_transaction = [this, &commit_timestamp_and_accessor](uint64_t commit_timestamp) { auto get_transaction = [this, &commit_timestamp_and_accessor](uint64_t commit_timestamp) {
if (!commit_timestamp_and_accessor) { if (!commit_timestamp_and_accessor) {
commit_timestamp_and_accessor.emplace(commit_timestamp, storage_->Access()); commit_timestamp_and_accessor.emplace(commit_timestamp, storage_->Access());
@ -408,7 +409,7 @@ uint64_t Storage::ReplicationServer::ReadAndApplyDelta(durability::BaseDecoder *
if (edges.HasError()) throw utils::BasicException("Invalid transaction!"); if (edges.HasError()) throw utils::BasicException("Invalid transaction!");
if (edges->size() != 1) throw utils::BasicException("Invalid transaction!"); if (edges->size() != 1) throw utils::BasicException("Invalid transaction!");
auto &edge = (*edges)[0]; auto &edge = (*edges)[0];
auto ret = transaction->DeleteEdge(&edge); auto ret = transaction->DeleteEdge(edge.get());
if (ret.HasError()) throw utils::BasicException("Invalid transaction!"); if (ret.HasError()) throw utils::BasicException("Invalid transaction!");
break; break;
} }
@ -466,14 +467,14 @@ uint64_t Storage::ReplicationServer::ReadAndApplyDelta(durability::BaseDecoder *
// type and invalid from/to pointers because we don't know them // type and invalid from/to pointers because we don't know them
// here, but that isn't an issue because we won't use that part of // here, but that isn't an issue because we won't use that part of
// the API here. // the API here.
auto ea = EdgeAccessor{edge_ref, auto ea = InMemoryEdgeAccessor{edge_ref,
EdgeTypeId::FromUint(0UL), EdgeTypeId::FromUint(0UL),
nullptr, nullptr,
nullptr, nullptr,
&transaction->transaction_, &transaction->transaction_,
&storage_->indices_, &storage_->indices_,
&storage_->constraints_, &storage_->constraints_,
storage_->config_.items}; storage_->config_.items};
auto ret = ea.SetProperty(transaction->NameToProperty(delta.vertex_edge_set_property.property), auto ret = ea.SetProperty(transaction->NameToProperty(delta.vertex_edge_set_property.property),
delta.vertex_edge_set_property.value); delta.vertex_edge_set_property.value);

View File

@ -11,13 +11,13 @@
#pragma once #pragma once
#include "storage/v2/storage.hpp" #include "storage/v2/inmemory/storage.hpp"
namespace memgraph::storage { namespace memgraph::storage {
class InMemoryStorage::ReplicationServer { class InMemoryStorage::ReplicationServer {
public: public:
explicit ReplicationServer(Storage *storage, io::network::Endpoint endpoint, explicit ReplicationServer(InMemoryStorage *storage, io::network::Endpoint endpoint,
const replication::ReplicationServerConfig &config); const replication::ReplicationServerConfig &config);
ReplicationServer(const ReplicationServer &) = delete; ReplicationServer(const ReplicationServer &) = delete;
ReplicationServer(ReplicationServer &&) = delete; ReplicationServer(ReplicationServer &&) = delete;

248
src/storage/v2/storage.cpp Normal file
View File

@ -0,0 +1,248 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include "storage/v2/storage.hpp"
#include "storage/v2/inmemory/storage.hpp"
namespace memgraph::storage {
AllVerticesIterable::Iterator::Iterator(AllVerticesIterable *self, utils::SkipList<Vertex>::Iterator it)
: self_(self),
it_(AdvanceToVisibleVertex(it, self->vertices_accessor_.end(), &self->vertex_, self->transaction_, self->view_,
self->indices_, self_->constraints_, self->config_)) {}
VertexAccessor *AllVerticesIterable::Iterator::operator*() const { return self_->vertex_.get(); }
AllVerticesIterable::Iterator &AllVerticesIterable::Iterator::operator++() {
++it_;
it_ = AdvanceToVisibleVertex(it_, self_->vertices_accessor_.end(), &self_->vertex_, self_->transaction_, self_->view_,
self_->indices_, self_->constraints_, self_->config_);
return *this;
}
VerticesIterable::VerticesIterable(AllVerticesIterable vertices) : type_(Type::ALL) {
new (&all_vertices_) AllVerticesIterable(std::move(vertices));
}
VerticesIterable::VerticesIterable(LabelIndex::Iterable vertices) : type_(Type::BY_LABEL) {
new (&vertices_by_label_) LabelIndex::Iterable(std::move(vertices));
}
VerticesIterable::VerticesIterable(LabelPropertyIndex::Iterable vertices) : type_(Type::BY_LABEL_PROPERTY) {
new (&vertices_by_label_property_) LabelPropertyIndex::Iterable(std::move(vertices));
}
VerticesIterable::VerticesIterable(VerticesIterable &&other) noexcept : type_(other.type_) {
switch (other.type_) {
case Type::ALL:
new (&all_vertices_) AllVerticesIterable(std::move(other.all_vertices_));
break;
case Type::BY_LABEL:
new (&vertices_by_label_) LabelIndex::Iterable(std::move(other.vertices_by_label_));
break;
case Type::BY_LABEL_PROPERTY:
new (&vertices_by_label_property_) LabelPropertyIndex::Iterable(std::move(other.vertices_by_label_property_));
break;
}
}
VerticesIterable &VerticesIterable::operator=(VerticesIterable &&other) noexcept {
switch (type_) {
case Type::ALL:
all_vertices_.AllVerticesIterable::~AllVerticesIterable();
break;
case Type::BY_LABEL:
vertices_by_label_.LabelIndex::Iterable::~Iterable();
break;
case Type::BY_LABEL_PROPERTY:
vertices_by_label_property_.LabelPropertyIndex::Iterable::~Iterable();
break;
}
type_ = other.type_;
switch (other.type_) {
case Type::ALL:
new (&all_vertices_) AllVerticesIterable(std::move(other.all_vertices_));
break;
case Type::BY_LABEL:
new (&vertices_by_label_) LabelIndex::Iterable(std::move(other.vertices_by_label_));
break;
case Type::BY_LABEL_PROPERTY:
new (&vertices_by_label_property_) LabelPropertyIndex::Iterable(std::move(other.vertices_by_label_property_));
break;
}
return *this;
}
VerticesIterable::~VerticesIterable() {
switch (type_) {
case Type::ALL:
all_vertices_.AllVerticesIterable::~AllVerticesIterable();
break;
case Type::BY_LABEL:
vertices_by_label_.LabelIndex::Iterable::~Iterable();
break;
case Type::BY_LABEL_PROPERTY:
vertices_by_label_property_.LabelPropertyIndex::Iterable::~Iterable();
break;
}
}
VerticesIterable::Iterator VerticesIterable::begin() {
switch (type_) {
case Type::ALL:
return Iterator(all_vertices_.begin());
case Type::BY_LABEL:
return Iterator(vertices_by_label_.begin());
case Type::BY_LABEL_PROPERTY:
return Iterator(vertices_by_label_property_.begin());
}
}
VerticesIterable::Iterator VerticesIterable::end() {
switch (type_) {
case Type::ALL:
return Iterator(all_vertices_.end());
case Type::BY_LABEL:
return Iterator(vertices_by_label_.end());
case Type::BY_LABEL_PROPERTY:
return Iterator(vertices_by_label_property_.end());
}
}
VerticesIterable::Iterator::Iterator(AllVerticesIterable::Iterator it) : type_(Type::ALL) {
new (&all_it_) AllVerticesIterable::Iterator(std::move(it));
}
VerticesIterable::Iterator::Iterator(LabelIndex::Iterable::Iterator it) : type_(Type::BY_LABEL) {
new (&by_label_it_) LabelIndex::Iterable::Iterator(std::move(it));
}
VerticesIterable::Iterator::Iterator(LabelPropertyIndex::Iterable::Iterator it) : type_(Type::BY_LABEL_PROPERTY) {
new (&by_label_property_it_) LabelPropertyIndex::Iterable::Iterator(std::move(it));
}
VerticesIterable::Iterator::Iterator(const VerticesIterable::Iterator &other) : type_(other.type_) {
switch (other.type_) {
case Type::ALL:
new (&all_it_) AllVerticesIterable::Iterator(other.all_it_);
break;
case Type::BY_LABEL:
new (&by_label_it_) LabelIndex::Iterable::Iterator(other.by_label_it_);
break;
case Type::BY_LABEL_PROPERTY:
new (&by_label_property_it_) LabelPropertyIndex::Iterable::Iterator(other.by_label_property_it_);
break;
}
}
VerticesIterable::Iterator &VerticesIterable::Iterator::operator=(const VerticesIterable::Iterator &other) {
Destroy();
type_ = other.type_;
switch (other.type_) {
case Type::ALL:
new (&all_it_) AllVerticesIterable::Iterator(other.all_it_);
break;
case Type::BY_LABEL:
new (&by_label_it_) LabelIndex::Iterable::Iterator(other.by_label_it_);
break;
case Type::BY_LABEL_PROPERTY:
new (&by_label_property_it_) LabelPropertyIndex::Iterable::Iterator(other.by_label_property_it_);
break;
}
return *this;
}
VerticesIterable::Iterator::Iterator(VerticesIterable::Iterator &&other) noexcept : type_(other.type_) {
switch (other.type_) {
case Type::ALL:
new (&all_it_) AllVerticesIterable::Iterator(std::move(other.all_it_));
break;
case Type::BY_LABEL:
new (&by_label_it_) LabelIndex::Iterable::Iterator(std::move(other.by_label_it_));
break;
case Type::BY_LABEL_PROPERTY:
new (&by_label_property_it_) LabelPropertyIndex::Iterable::Iterator(std::move(other.by_label_property_it_));
break;
}
}
VerticesIterable::Iterator &VerticesIterable::Iterator::operator=(VerticesIterable::Iterator &&other) noexcept {
Destroy();
type_ = other.type_;
switch (other.type_) {
case Type::ALL:
new (&all_it_) AllVerticesIterable::Iterator(std::move(other.all_it_));
break;
case Type::BY_LABEL:
new (&by_label_it_) LabelIndex::Iterable::Iterator(std::move(other.by_label_it_));
break;
case Type::BY_LABEL_PROPERTY:
new (&by_label_property_it_) LabelPropertyIndex::Iterable::Iterator(std::move(other.by_label_property_it_));
break;
}
return *this;
}
VerticesIterable::Iterator::~Iterator() { Destroy(); }
void VerticesIterable::Iterator::Destroy() noexcept {
switch (type_) {
case Type::ALL:
all_it_.AllVerticesIterable::Iterator::~Iterator();
break;
case Type::BY_LABEL:
by_label_it_.LabelIndex::Iterable::Iterator::~Iterator();
break;
case Type::BY_LABEL_PROPERTY:
by_label_property_it_.LabelPropertyIndex::Iterable::Iterator::~Iterator();
break;
}
}
VertexAccessor VerticesIterable::Iterator::operator*() const {
switch (type_) {
case Type::ALL:
return *all_it_;
case Type::BY_LABEL:
return *by_label_it_;
case Type::BY_LABEL_PROPERTY:
return *by_label_property_it_;
}
}
VerticesIterable::Iterator &VerticesIterable::Iterator::operator++() {
switch (type_) {
case Type::ALL:
++all_it_;
break;
case Type::BY_LABEL:
++by_label_it_;
break;
case Type::BY_LABEL_PROPERTY:
++by_label_property_it_;
break;
}
return *this;
}
bool VerticesIterable::Iterator::operator==(const Iterator &other) const {
switch (type_) {
case Type::ALL:
return all_it_ == other.all_it_;
case Type::BY_LABEL:
return by_label_it_ == other.by_label_it_;
case Type::BY_LABEL_PROPERTY:
return by_label_property_it_ == other.by_label_property_it_;
}
}
} // namespace memgraph::storage

View File

@ -23,21 +23,55 @@
namespace memgraph::storage { namespace memgraph::storage {
class VerticesIterableImpl;
struct Transaction; struct Transaction;
class EdgeAccessor; class EdgeAccessor;
/// Structure used to return information about existing indices in the storage. // The storage is based on this paper:
struct IndicesInfo { // https://db.in.tum.de/~muehlbau/papers/mvcc.pdf
std::vector<LabelId> label; // The paper implements a fully serializable storage, in our implementation we
std::vector<std::pair<LabelId, PropertyId>> label_property; // only implement snapshot isolation for transactions.
};
/// Structure used to return information about existing constraints in the /// Iterable for iterating through all vertices of a Storage.
/// storage. ///
struct ConstraintsInfo { /// An instance of this will be usually be wrapped inside VerticesIterable for
std::vector<std::pair<LabelId, PropertyId>> existence; /// generic, public use.
std::vector<std::pair<LabelId, std::set<PropertyId>>> unique; class AllVerticesIterable final {
utils::SkipList<Vertex>::Accessor vertices_accessor_;
Transaction *transaction_;
View view_;
Indices *indices_;
Constraints *constraints_;
Config::Items config_;
std::unique_ptr<VertexAccessor> vertex_;
public:
class Iterator final {
AllVerticesIterable *self_;
utils::SkipList<Vertex>::Iterator it_;
public:
Iterator(AllVerticesIterable *self, utils::SkipList<Vertex>::Iterator it);
VertexAccessor *operator*() const;
Iterator &operator++();
bool operator==(const Iterator &other) const { return self_ == other.self_ && it_ == other.it_; }
bool operator!=(const Iterator &other) const { return !(*this == other); }
};
AllVerticesIterable(utils::SkipList<Vertex>::Accessor vertices_accessor, Transaction *transaction, View view,
Indices *indices, Constraints *constraints, Config::Items config)
: vertices_accessor_(std::move(vertices_accessor)),
transaction_(transaction),
view_(view),
indices_(indices),
constraints_(constraints),
config_(config) {}
Iterator begin() { return Iterator(this, vertices_accessor_.begin()); }
Iterator end() { return Iterator(this, vertices_accessor_.end()); }
}; };
/// Generic access to different kinds of vertex iterations. /// Generic access to different kinds of vertex iterations.
@ -48,10 +82,16 @@ class VerticesIterable final {
enum class Type { ALL, BY_LABEL, BY_LABEL_PROPERTY }; enum class Type { ALL, BY_LABEL, BY_LABEL_PROPERTY };
Type type_; Type type_;
VerticesIterableImpl *impl_; union {
AllVerticesIterable all_vertices_;
LabelIndex::Iterable vertices_by_label_;
LabelPropertyIndex::Iterable vertices_by_label_property_;
};
public: public:
explicit VerticesIterable(VerticesIterableImpl *); explicit VerticesIterable(AllVerticesIterable);
explicit VerticesIterable(LabelIndex::Iterable);
explicit VerticesIterable(LabelPropertyIndex::Iterable);
VerticesIterable(const VerticesIterable &) = delete; VerticesIterable(const VerticesIterable &) = delete;
VerticesIterable &operator=(const VerticesIterable &) = delete; VerticesIterable &operator=(const VerticesIterable &) = delete;
@ -63,12 +103,18 @@ class VerticesIterable final {
class Iterator final { class Iterator final {
Type type_; Type type_;
VerticesIterable::Iterator *impl_; union {
AllVerticesIterable::Iterator all_it_;
LabelIndex::Iterable::Iterator by_label_it_;
LabelPropertyIndex::Iterable::Iterator by_label_property_it_;
};
void Destroy() noexcept; void Destroy() noexcept;
public: public:
explicit Iterator(VerticesIterable::Iterator *); explicit Iterator(AllVerticesIterable::Iterator);
explicit Iterator(LabelIndex::Iterable::Iterator);
explicit Iterator(LabelPropertyIndex::Iterable::Iterator);
Iterator(const Iterator &); Iterator(const Iterator &);
Iterator &operator=(const Iterator &); Iterator &operator=(const Iterator &);
@ -90,8 +136,22 @@ class VerticesIterable final {
Iterator end(); Iterator end();
}; };
/// Structure used to return information about existing indices in the storage.
struct IndicesInfo {
std::vector<LabelId> label;
std::vector<std::pair<LabelId, PropertyId>> label_property;
};
/// Structure used to return information about existing constraints in the
/// storage.
struct ConstraintsInfo {
std::vector<std::pair<LabelId, PropertyId>> existence;
std::vector<std::pair<LabelId, std::set<PropertyId>>> unique;
};
class Accessor { class Accessor {
public: public:
Accessor() {}
Accessor(const Accessor &) = delete; Accessor(const Accessor &) = delete;
Accessor &operator=(const Accessor &) = delete; Accessor &operator=(const Accessor &) = delete;
Accessor &operator=(Accessor &&other) = delete; Accessor &operator=(Accessor &&other) = delete;
@ -103,9 +163,9 @@ class Accessor {
virtual ~Accessor(); virtual ~Accessor();
/// @throw std::bad_alloc /// @throw std::bad_alloc
virtual VertexAccessor CreateVertex() = 0; virtual std::unique_ptr<VertexAccessor> CreateVertex() = 0;
virtual std::optional<VertexAccessor> FindVertex(Gid gid, View view) = 0; virtual std::unique_ptr<VertexAccessor> FindVertex(Gid gid, View view) = 0;
virtual VerticesIterable Vertices(View view) = 0; virtual VerticesIterable Vertices(View view) = 0;
@ -156,19 +216,20 @@ class Accessor {
/// @return Accessor to the deleted vertex if a deletion took place, std::nullopt otherwise /// @return Accessor to the deleted vertex if a deletion took place, std::nullopt otherwise
/// @throw std::bad_alloc /// @throw std::bad_alloc
virtual Result<std::optional<VertexAccessor>> DeleteVertex(VertexAccessor *vertex) = 0; virtual Result<std::unique_ptr<VertexAccessor>> DeleteVertex(VertexAccessor *vertex) = 0;
/// @return Accessor to the deleted vertex and deleted edges if a deletion took place, std::nullopt otherwise /// @return Accessor to the deleted vertex and deleted edges if a deletion took place, std::nullopt otherwise
/// @throw std::bad_alloc /// @throw std::bad_alloc
virtual Result<std::optional<std::pair<VertexAccessor, std::vector<EdgeAccessor>>>> DetachDeleteVertex( virtual Result<std::optional<std::pair<std::unique_ptr<VertexAccessor>, std::vector<std::unique_ptr<EdgeAccessor>>>>>
VertexAccessor *vertex) = 0; DetachDeleteVertex(VertexAccessor *vertex) = 0;
/// @throw std::bad_alloc /// @throw std::bad_alloc
virtual Result<EdgeAccessor> CreateEdge(VertexAccessor *from, VertexAccessor *to, EdgeTypeId edge_type) = 0; virtual Result<std::unique_ptr<EdgeAccessor>> CreateEdge(VertexAccessor *from, VertexAccessor *to,
EdgeTypeId edge_type) = 0;
/// Accessor to the deleted edge if a deletion took place, std::nullopt otherwise /// Accessor to the deleted edge if a deletion took place, std::nullopt otherwise
/// @throw std::bad_alloc /// @throw std::bad_alloc
virtual Result<std::optional<EdgeAccessor>> DeleteEdge(EdgeAccessor *edge) = 0; virtual Result<std::unique_ptr<EdgeAccessor>> DeleteEdge(EdgeAccessor *edge) = 0;
virtual const std::string &LabelToName(LabelId label) const = 0; virtual const std::string &LabelToName(LabelId label) const = 0;
virtual const std::string &PropertyToName(PropertyId property) const = 0; virtual const std::string &PropertyToName(PropertyId property) const = 0;

View File

@ -21,7 +21,11 @@ std::unique_ptr<VertexAccessor> VertexAccessor::Create(Vertex *vertex, Transacti
return InMemoryVertexAccessor::Create(vertex, transaction, indices, constraints, config, view); return InMemoryVertexAccessor::Create(vertex, transaction, indices, constraints, config, view);
} }
Result<std::vector<EdgeAccessor>> VertexAccessor::InEdges(View view) const { return InEdges(view, {}, nullptr); } Result<std::vector<std::unique_ptr<EdgeAccessor>>> VertexAccessor::InEdges(View view) const {
Result<std::vector<EdgeAccessor>> VertexAccessor::OutEdges(View view) const { return OutEdges(view, {}, nullptr); } return InEdges(view, {}, nullptr);
}
Result<std::vector<std::unique_ptr<EdgeAccessor>>> VertexAccessor::OutEdges(View view) const {
return OutEdges(view, {}, nullptr);
}
} // namespace memgraph::storage } // namespace memgraph::storage

View File

@ -81,18 +81,20 @@ class VertexAccessor {
/// @throw std::bad_alloc /// @throw std::bad_alloc
/// @throw std::length_error if the resulting vector exceeds /// @throw std::length_error if the resulting vector exceeds
/// std::vector::max_size(). /// std::vector::max_size().
virtual Result<std::vector<EdgeAccessor>> InEdges(View view, const std::vector<EdgeTypeId> &edge_types, virtual Result<std::vector<std::unique_ptr<EdgeAccessor>>> InEdges(View view,
const VertexAccessor *destination) const = 0; const std::vector<EdgeTypeId> &edge_types,
const VertexAccessor *destination) const = 0;
Result<std::vector<EdgeAccessor>> InEdges(View view) const; Result<std::vector<std::unique_ptr<EdgeAccessor>>> InEdges(View view) const;
/// @throw std::bad_alloc /// @throw std::bad_alloc
/// @throw std::length_error if the resulting vector exceeds /// @throw std::length_error if the resulting vector exceeds
/// std::vector::max_size(). /// std::vector::max_size().
virtual Result<std::vector<EdgeAccessor>> OutEdges(View view, const std::vector<EdgeTypeId> &edge_types, virtual Result<std::vector<std::unique_ptr<EdgeAccessor>>> OutEdges(View view,
const VertexAccessor *destination) const = 0; const std::vector<EdgeTypeId> &edge_types,
const VertexAccessor *destination) const = 0;
Result<std::vector<EdgeAccessor>> OutEdges(View view) const; Result<std::vector<std::unique_ptr<EdgeAccessor>>> OutEdges(View view) const;
virtual Result<size_t> InDegree(View view) const = 0; virtual Result<size_t> InDegree(View view) const = 0;