Implement edges in storage v2
Summary: This change implements full edges support in storage v2. Edges can be created and deleted. Support for detach-deleting vertices is added and regular vertex deletion verifies existance of edges. Reviewers: mtomic, teon.banek Reviewed By: mtomic Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D2180
This commit is contained in:
parent
a2c5b5eac6
commit
5b8e7ff432
@ -1,4 +1,5 @@
|
||||
set(storage_v2_src_files
|
||||
edge_accessor.cpp
|
||||
vertex_accessor.cpp
|
||||
storage.cpp)
|
||||
|
||||
|
@ -6,21 +6,39 @@
|
||||
|
||||
namespace storage {
|
||||
|
||||
// Forward declarations because we only store pointers here.
|
||||
struct Vertex;
|
||||
struct Edge;
|
||||
|
||||
struct Delta {
|
||||
enum class Action {
|
||||
// Used for both Vertex and Edge
|
||||
DELETE_OBJECT,
|
||||
RECREATE_OBJECT,
|
||||
SET_PROPERTY,
|
||||
|
||||
// Used only for Vertex
|
||||
ADD_LABEL,
|
||||
REMOVE_LABEL,
|
||||
SET_PROPERTY,
|
||||
ADD_IN_EDGE,
|
||||
ADD_OUT_EDGE,
|
||||
REMOVE_IN_EDGE,
|
||||
REMOVE_OUT_EDGE,
|
||||
};
|
||||
|
||||
// Used for both Vertex and Edge
|
||||
struct DeleteObjectTag {};
|
||||
struct RecreateObjectTag {};
|
||||
struct AddLabelTag {};
|
||||
struct RemoveLabelTag {};
|
||||
struct SetPropertyTag {};
|
||||
|
||||
// Used only for Vertex
|
||||
struct AddInEdgeTag {};
|
||||
struct AddOutEdgeTag {};
|
||||
struct RemoveInEdgeTag {};
|
||||
struct RemoveOutEdgeTag {};
|
||||
|
||||
Delta(DeleteObjectTag, std::atomic<uint64_t> *timestamp, uint64_t command_id)
|
||||
: action(Action::DELETE_OBJECT),
|
||||
timestamp(timestamp),
|
||||
@ -53,6 +71,34 @@ struct Delta {
|
||||
command_id(command_id),
|
||||
property({key, value}) {}
|
||||
|
||||
Delta(AddInEdgeTag, uint64_t edge_type, Vertex *vertex, Edge *edge,
|
||||
std::atomic<uint64_t> *timestamp, uint64_t command_id)
|
||||
: action(Action::ADD_IN_EDGE),
|
||||
timestamp(timestamp),
|
||||
command_id(command_id),
|
||||
vertex_edge({edge_type, vertex, edge}) {}
|
||||
|
||||
Delta(AddOutEdgeTag, uint64_t edge_type, Vertex *vertex, Edge *edge,
|
||||
std::atomic<uint64_t> *timestamp, uint64_t command_id)
|
||||
: action(Action::ADD_OUT_EDGE),
|
||||
timestamp(timestamp),
|
||||
command_id(command_id),
|
||||
vertex_edge({edge_type, vertex, edge}) {}
|
||||
|
||||
Delta(RemoveInEdgeTag, uint64_t edge_type, Vertex *vertex, Edge *edge,
|
||||
std::atomic<uint64_t> *timestamp, uint64_t command_id)
|
||||
: action(Action::REMOVE_IN_EDGE),
|
||||
timestamp(timestamp),
|
||||
command_id(command_id),
|
||||
vertex_edge({edge_type, vertex, edge}) {}
|
||||
|
||||
Delta(RemoveOutEdgeTag, uint64_t edge_type, Vertex *vertex, Edge *edge,
|
||||
std::atomic<uint64_t> *timestamp, uint64_t command_id)
|
||||
: action(Action::REMOVE_OUT_EDGE),
|
||||
timestamp(timestamp),
|
||||
command_id(command_id),
|
||||
vertex_edge({edge_type, vertex, edge}) {}
|
||||
|
||||
Delta(Delta &&other) noexcept
|
||||
: action(other.action),
|
||||
timestamp(other.timestamp),
|
||||
@ -71,6 +117,12 @@ struct Delta {
|
||||
property.key = other.property.key;
|
||||
new (&property.value) PropertyValue(std::move(other.property.value));
|
||||
break;
|
||||
case Action::ADD_IN_EDGE:
|
||||
case Action::ADD_OUT_EDGE:
|
||||
case Action::REMOVE_IN_EDGE:
|
||||
case Action::REMOVE_OUT_EDGE:
|
||||
vertex_edge = other.vertex_edge;
|
||||
break;
|
||||
}
|
||||
|
||||
// reset the action of other
|
||||
@ -98,6 +150,11 @@ struct Delta {
|
||||
uint64_t key;
|
||||
storage::PropertyValue value;
|
||||
} property;
|
||||
struct {
|
||||
uint64_t edge_type;
|
||||
Vertex *vertex;
|
||||
Edge *edge;
|
||||
} vertex_edge;
|
||||
};
|
||||
|
||||
private:
|
||||
@ -107,6 +164,10 @@ struct Delta {
|
||||
case Action::RECREATE_OBJECT:
|
||||
case Action::ADD_LABEL:
|
||||
case Action::REMOVE_LABEL:
|
||||
case Action::ADD_IN_EDGE:
|
||||
case Action::ADD_OUT_EDGE:
|
||||
case Action::REMOVE_IN_EDGE:
|
||||
case Action::REMOVE_OUT_EDGE:
|
||||
break;
|
||||
case Action::SET_PROPERTY:
|
||||
property.value.~PropertyValue();
|
||||
|
47
src/storage/v2/edge.hpp
Normal file
47
src/storage/v2/edge.hpp
Normal file
@ -0,0 +1,47 @@
|
||||
#pragma once
|
||||
|
||||
#include <limits>
|
||||
#include <unordered_map>
|
||||
|
||||
#include "utils/spin_lock.hpp"
|
||||
|
||||
#include "storage/v2/delta.hpp"
|
||||
#include "storage/v2/gid.hpp"
|
||||
|
||||
namespace storage {
|
||||
|
||||
struct Vertex;
|
||||
|
||||
struct Edge {
|
||||
Edge(Gid gid, Delta *delta) : gid(gid), deleted(false), delta(delta) {
|
||||
CHECK(delta->action == Delta::Action::DELETE_OBJECT)
|
||||
<< "Edge must be created with an initial DELETE_OBJECT delta!";
|
||||
}
|
||||
|
||||
Gid gid;
|
||||
|
||||
// TODO: add
|
||||
// std::unordered_map<uint64_t, storage::PropertyValue> properties;
|
||||
|
||||
utils::SpinLock lock;
|
||||
bool deleted;
|
||||
// uint8_t PAD;
|
||||
// uint16_t PAD;
|
||||
|
||||
Delta *delta;
|
||||
};
|
||||
|
||||
inline bool operator==(const Edge &first, const Edge &second) {
|
||||
return first.gid == second.gid;
|
||||
}
|
||||
inline bool operator<(const Edge &first, const Edge &second) {
|
||||
return first.gid < second.gid;
|
||||
}
|
||||
inline bool operator==(const Edge &first, const Gid &second) {
|
||||
return first.gid == second;
|
||||
}
|
||||
inline bool operator<(const Edge &first, const Gid &second) {
|
||||
return first.gid < second;
|
||||
}
|
||||
|
||||
} // namespace storage
|
18
src/storage/v2/edge_accessor.cpp
Normal file
18
src/storage/v2/edge_accessor.cpp
Normal file
@ -0,0 +1,18 @@
|
||||
#include "storage/v2/edge_accessor.hpp"
|
||||
|
||||
#include <memory>
|
||||
|
||||
#include "storage/v2/mvcc.hpp"
|
||||
#include "storage/v2/vertex_accessor.hpp"
|
||||
|
||||
namespace storage {
|
||||
|
||||
VertexAccessor EdgeAccessor::FromVertex() {
|
||||
return VertexAccessor{from_vertex_, transaction_};
|
||||
}
|
||||
|
||||
VertexAccessor EdgeAccessor::ToVertex() {
|
||||
return VertexAccessor{to_vertex_, transaction_};
|
||||
}
|
||||
|
||||
} // namespace storage
|
51
src/storage/v2/edge_accessor.hpp
Normal file
51
src/storage/v2/edge_accessor.hpp
Normal file
@ -0,0 +1,51 @@
|
||||
#pragma once
|
||||
|
||||
#include <optional>
|
||||
|
||||
#include "storage/v2/edge.hpp"
|
||||
|
||||
#include "storage/v2/result.hpp"
|
||||
#include "storage/v2/transaction.hpp"
|
||||
#include "storage/v2/view.hpp"
|
||||
|
||||
namespace storage {
|
||||
|
||||
struct Vertex;
|
||||
class VertexAccessor;
|
||||
class Storage;
|
||||
|
||||
class EdgeAccessor final {
|
||||
private:
|
||||
friend class Storage;
|
||||
|
||||
public:
|
||||
EdgeAccessor(Edge *edge, uint64_t edge_type, Vertex *from_vertex,
|
||||
Vertex *to_vertex, Transaction *transaction)
|
||||
: edge_(edge),
|
||||
edge_type_(edge_type),
|
||||
from_vertex_(from_vertex),
|
||||
to_vertex_(to_vertex),
|
||||
transaction_(transaction) {}
|
||||
|
||||
VertexAccessor FromVertex();
|
||||
|
||||
VertexAccessor ToVertex();
|
||||
|
||||
uint64_t EdgeType() const { return edge_type_; }
|
||||
|
||||
Gid Gid() const { return edge_->gid; }
|
||||
|
||||
bool operator==(const EdgeAccessor &other) const {
|
||||
return edge_ == other.edge_ && transaction_ == other.transaction_;
|
||||
}
|
||||
bool operator!=(const EdgeAccessor &other) const { return !(*this == other); }
|
||||
|
||||
private:
|
||||
Edge *edge_;
|
||||
uint64_t edge_type_;
|
||||
Vertex *from_vertex_;
|
||||
Vertex *to_vertex_;
|
||||
Transaction *transaction_;
|
||||
};
|
||||
|
||||
} // namespace storage
|
@ -1,8 +1,12 @@
|
||||
#pragma once
|
||||
|
||||
#include <type_traits>
|
||||
|
||||
#include "storage/v2/delta.hpp"
|
||||
#include "storage/v2/edge.hpp"
|
||||
#include "storage/v2/property_value.hpp"
|
||||
#include "storage/v2/transaction.hpp"
|
||||
#include "storage/v2/vertex.hpp"
|
||||
#include "storage/v2/view.hpp"
|
||||
|
||||
namespace storage {
|
||||
@ -47,15 +51,16 @@ inline void ApplyDeltasForRead(Transaction *transaction, Delta *delta,
|
||||
}
|
||||
}
|
||||
|
||||
/// This function prepares the Vertex object for a write. It checks whether
|
||||
/// there are any serialization errors in the process (eg. the object can't be
|
||||
/// written to from this transaction because it is being written to from another
|
||||
/// This function prepares the object for a write. It checks whether there are
|
||||
/// any serialization errors in the process (eg. the object can't be written to
|
||||
/// from this transaction because it is being written to from another
|
||||
/// transaction) and returns a `bool` value indicating whether the caller can
|
||||
/// proceed with a write operation.
|
||||
inline bool PrepareForWrite(Transaction *transaction, Vertex *vertex) {
|
||||
if (vertex->delta == nullptr) return true;
|
||||
template <typename TObj>
|
||||
inline bool PrepareForWrite(Transaction *transaction, TObj *object) {
|
||||
if (object->delta == nullptr) return true;
|
||||
|
||||
auto ts = vertex->delta->timestamp->load(std::memory_order_acquire);
|
||||
auto ts = object->delta->timestamp->load(std::memory_order_acquire);
|
||||
if (ts == transaction->transaction_id || ts < transaction->start_timestamp) {
|
||||
return true;
|
||||
}
|
||||
@ -74,25 +79,33 @@ inline Delta *CreateDeleteObjectDelta(Transaction *transaction) {
|
||||
transaction->command_id);
|
||||
}
|
||||
|
||||
/// This function creates a delta in the transaction for the Vertex object and
|
||||
/// links the delta into the Vertex's delta list. It also adds the Vertex to the
|
||||
/// transaction's modified vertices list.
|
||||
template <class... Args>
|
||||
inline void CreateAndLinkDelta(Transaction *transaction, Vertex *vertex,
|
||||
/// This function creates a delta in the transaction for the object and links
|
||||
/// the delta into the object's delta list. It also adds the object to the
|
||||
/// transaction's modified objects list.
|
||||
template <typename TObj, class... Args>
|
||||
inline void CreateAndLinkDelta(Transaction *transaction, TObj *object,
|
||||
Args &&... args) {
|
||||
auto delta = &transaction->deltas.emplace_back(std::forward<Args>(args)...,
|
||||
&transaction->commit_timestamp,
|
||||
transaction->command_id);
|
||||
|
||||
if (vertex->delta) {
|
||||
vertex->delta->prev = delta;
|
||||
if (object->delta) {
|
||||
object->delta->prev = delta;
|
||||
}
|
||||
delta->next.store(vertex->delta, std::memory_order_release);
|
||||
vertex->delta = delta;
|
||||
delta->next.store(object->delta, std::memory_order_release);
|
||||
object->delta = delta;
|
||||
|
||||
if (transaction->modified_vertices.empty() ||
|
||||
transaction->modified_vertices.back() != vertex) {
|
||||
transaction->modified_vertices.push_back(vertex);
|
||||
if constexpr (std::is_same_v<TObj, Vertex>) {
|
||||
if (transaction->modified_vertices.empty() ||
|
||||
transaction->modified_vertices.back() != object) {
|
||||
transaction->modified_vertices.push_back(object);
|
||||
}
|
||||
}
|
||||
if constexpr (std::is_same_v<TObj, Edge>) {
|
||||
if (transaction->modified_edges.empty() ||
|
||||
transaction->modified_edges.back() != object) {
|
||||
transaction->modified_edges.push_back(object);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -9,6 +9,7 @@ namespace storage {
|
||||
enum class Error : uint8_t {
|
||||
SERIALIZATION_ERROR,
|
||||
DELETED_OBJECT,
|
||||
VERTEX_HAS_EDGES,
|
||||
};
|
||||
|
||||
template <typename TReturn>
|
||||
|
@ -64,7 +64,7 @@ VertexAccessor Storage::Accessor::CreateVertex() {
|
||||
CHECK(inserted) << "The vertex must be inserted here!";
|
||||
CHECK(it != acc.end()) << "Invalid Vertex accessor!";
|
||||
transaction_->modified_vertices.push_back(&*it);
|
||||
return VertexAccessor::Create(&*it, transaction_, View::NEW).value();
|
||||
return VertexAccessor{&*it, transaction_};
|
||||
}
|
||||
|
||||
std::optional<VertexAccessor> Storage::Accessor::FindVertex(Gid gid,
|
||||
@ -88,13 +88,204 @@ Result<bool> Storage::Accessor::DeleteVertex(VertexAccessor *vertex) {
|
||||
|
||||
if (vertex_ptr->deleted) return Result<bool>{false};
|
||||
|
||||
CreateAndLinkDelta(transaction_, vertex_ptr, Delta::RecreateObjectTag());
|
||||
if (!vertex_ptr->in_edges.empty() || !vertex_ptr->out_edges.empty())
|
||||
return Result<bool>{Error::VERTEX_HAS_EDGES};
|
||||
|
||||
CreateAndLinkDelta(transaction_, vertex_ptr, Delta::RecreateObjectTag());
|
||||
vertex_ptr->deleted = true;
|
||||
|
||||
return Result<bool>{true};
|
||||
}
|
||||
|
||||
Result<bool> Storage::Accessor::DetachDeleteVertex(VertexAccessor *vertex) {
|
||||
CHECK(vertex->transaction_ == transaction_)
|
||||
<< "VertexAccessor must be from the same transaction as the storage "
|
||||
"accessor when deleting a vertex!";
|
||||
auto vertex_ptr = vertex->vertex_;
|
||||
|
||||
std::vector<std::tuple<uint64_t, Vertex *, Edge *>> in_edges;
|
||||
std::vector<std::tuple<uint64_t, Vertex *, Edge *>> out_edges;
|
||||
|
||||
{
|
||||
std::lock_guard<utils::SpinLock> guard(vertex_ptr->lock);
|
||||
|
||||
if (!PrepareForWrite(transaction_, vertex_ptr))
|
||||
return Result<bool>{Error::SERIALIZATION_ERROR};
|
||||
|
||||
if (vertex_ptr->deleted) return Result<bool>{false};
|
||||
|
||||
in_edges = vertex_ptr->in_edges;
|
||||
out_edges = vertex_ptr->out_edges;
|
||||
}
|
||||
|
||||
for (const auto &item : in_edges) {
|
||||
auto [edge_type, from_vertex, edge] = item;
|
||||
EdgeAccessor e{edge, edge_type, from_vertex, vertex_ptr, transaction_};
|
||||
auto ret = DeleteEdge(&e);
|
||||
if (ret.IsError()) {
|
||||
CHECK(ret.GetError() == Error::SERIALIZATION_ERROR)
|
||||
<< "Invalid database state!";
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
for (const auto &item : out_edges) {
|
||||
auto [edge_type, to_vertex, edge] = item;
|
||||
EdgeAccessor e{edge, edge_type, vertex_ptr, to_vertex, transaction_};
|
||||
auto ret = DeleteEdge(&e);
|
||||
if (ret.IsError()) {
|
||||
CHECK(ret.GetError() == Error::SERIALIZATION_ERROR)
|
||||
<< "Invalid database state!";
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
std::lock_guard<utils::SpinLock> guard(vertex_ptr->lock);
|
||||
|
||||
// We need to check again for serialization errors because we unlocked the
|
||||
// vertex. Some other transaction could have modified the vertex in the
|
||||
// meantime if we didn't have any edges to delete.
|
||||
|
||||
if (!PrepareForWrite(transaction_, vertex_ptr))
|
||||
return Result<bool>{Error::SERIALIZATION_ERROR};
|
||||
|
||||
CHECK(!vertex_ptr->deleted) << "Invalid database state!";
|
||||
|
||||
CreateAndLinkDelta(transaction_, vertex_ptr, Delta::RecreateObjectTag());
|
||||
vertex_ptr->deleted = true;
|
||||
|
||||
return Result<bool>{true};
|
||||
}
|
||||
|
||||
Result<EdgeAccessor> Storage::Accessor::CreateEdge(VertexAccessor *from,
|
||||
VertexAccessor *to,
|
||||
uint64_t edge_type) {
|
||||
CHECK(from->transaction_ == to->transaction_)
|
||||
<< "VertexAccessors must be from the same transaction when creating "
|
||||
"an edge!";
|
||||
CHECK(from->transaction_ == transaction_)
|
||||
<< "VertexAccessors must be from the same transaction in when "
|
||||
"creating an edge!";
|
||||
|
||||
auto from_vertex = from->vertex_;
|
||||
auto to_vertex = to->vertex_;
|
||||
|
||||
// Obtain the locks by `gid` order to avoid lock cycles.
|
||||
std::unique_lock<utils::SpinLock> guard_from(from_vertex->lock,
|
||||
std::defer_lock);
|
||||
std::unique_lock<utils::SpinLock> guard_to(to_vertex->lock, std::defer_lock);
|
||||
if (from_vertex->gid < to_vertex->gid) {
|
||||
guard_from.lock();
|
||||
guard_to.lock();
|
||||
} else if (from_vertex->gid > to_vertex->gid) {
|
||||
guard_to.lock();
|
||||
guard_from.lock();
|
||||
} else {
|
||||
// The vertices are the same vertex, only lock one.
|
||||
guard_from.lock();
|
||||
}
|
||||
|
||||
if (!PrepareForWrite(transaction_, from_vertex))
|
||||
return Result<EdgeAccessor>{Error::SERIALIZATION_ERROR};
|
||||
CHECK(!from_vertex->deleted) << "Invalid database state!";
|
||||
|
||||
if (to_vertex != from_vertex) {
|
||||
if (!PrepareForWrite(transaction_, to_vertex))
|
||||
return Result<EdgeAccessor>{Error::SERIALIZATION_ERROR};
|
||||
CHECK(!to_vertex->deleted) << "Invalid database state!";
|
||||
}
|
||||
|
||||
auto gid = storage_->edge_id_.fetch_add(1, std::memory_order_acq_rel);
|
||||
auto acc = storage_->edges_.access();
|
||||
auto delta = CreateDeleteObjectDelta(transaction_);
|
||||
auto [it, inserted] = acc.insert(Edge{storage::Gid::FromUint(gid), delta});
|
||||
CHECK(inserted) << "The edge must be inserted here!";
|
||||
CHECK(it != acc.end()) << "Invalid Edge accessor!";
|
||||
auto edge = &*it;
|
||||
transaction_->modified_edges.push_back(edge);
|
||||
|
||||
CreateAndLinkDelta(transaction_, from_vertex, Delta::RemoveOutEdgeTag(),
|
||||
edge_type, to_vertex, edge);
|
||||
from_vertex->out_edges.emplace_back(edge_type, to_vertex, edge);
|
||||
|
||||
CreateAndLinkDelta(transaction_, to_vertex, Delta::RemoveInEdgeTag(),
|
||||
edge_type, from_vertex, edge);
|
||||
to_vertex->in_edges.emplace_back(edge_type, from_vertex, edge);
|
||||
|
||||
return Result<EdgeAccessor>{
|
||||
EdgeAccessor{edge, edge_type, from_vertex, to_vertex, transaction_}};
|
||||
}
|
||||
|
||||
Result<bool> Storage::Accessor::DeleteEdge(EdgeAccessor *edge) {
|
||||
CHECK(edge->transaction_ == transaction_)
|
||||
<< "EdgeAccessor must be from the same transaction as the storage "
|
||||
"accessor when deleting an edge!";
|
||||
auto edge_ptr = edge->edge_;
|
||||
auto edge_type = edge->edge_type_;
|
||||
|
||||
std::lock_guard<utils::SpinLock> guard(edge_ptr->lock);
|
||||
|
||||
if (!PrepareForWrite(transaction_, edge_ptr))
|
||||
return Result<bool>{Error::SERIALIZATION_ERROR};
|
||||
|
||||
if (edge_ptr->deleted) return Result<bool>{false};
|
||||
|
||||
auto from_vertex = edge->from_vertex_;
|
||||
auto to_vertex = edge->to_vertex_;
|
||||
|
||||
// Obtain the locks by `gid` order to avoid lock cycles.
|
||||
std::unique_lock<utils::SpinLock> guard_from(from_vertex->lock,
|
||||
std::defer_lock);
|
||||
std::unique_lock<utils::SpinLock> guard_to(to_vertex->lock, std::defer_lock);
|
||||
if (from_vertex->gid < to_vertex->gid) {
|
||||
guard_from.lock();
|
||||
guard_to.lock();
|
||||
} else if (from_vertex->gid > to_vertex->gid) {
|
||||
guard_to.lock();
|
||||
guard_from.lock();
|
||||
} else {
|
||||
// The vertices are the same vertex, only lock one.
|
||||
guard_from.lock();
|
||||
}
|
||||
|
||||
if (!PrepareForWrite(transaction_, from_vertex))
|
||||
return Result<bool>{Error::SERIALIZATION_ERROR};
|
||||
CHECK(!from_vertex->deleted) << "Invalid database state!";
|
||||
|
||||
if (to_vertex != from_vertex) {
|
||||
if (!PrepareForWrite(transaction_, to_vertex))
|
||||
return Result<bool>{Error::SERIALIZATION_ERROR};
|
||||
CHECK(!to_vertex->deleted) << "Invalid database state!";
|
||||
}
|
||||
|
||||
CreateAndLinkDelta(transaction_, edge_ptr, Delta::RecreateObjectTag());
|
||||
edge_ptr->deleted = true;
|
||||
|
||||
CreateAndLinkDelta(transaction_, from_vertex, Delta::AddOutEdgeTag(),
|
||||
edge_type, to_vertex, edge_ptr);
|
||||
{
|
||||
std::tuple<uint64_t, Vertex *, Edge *> link{edge_type, to_vertex, edge_ptr};
|
||||
auto it = std::find(from_vertex->out_edges.begin(),
|
||||
from_vertex->out_edges.end(), link);
|
||||
CHECK(it != from_vertex->out_edges.end()) << "Invalid database state!";
|
||||
std::swap(*it, *from_vertex->out_edges.rbegin());
|
||||
from_vertex->out_edges.pop_back();
|
||||
}
|
||||
|
||||
CreateAndLinkDelta(transaction_, to_vertex, Delta::AddInEdgeTag(), edge_type,
|
||||
from_vertex, edge_ptr);
|
||||
{
|
||||
std::tuple<uint64_t, Vertex *, Edge *> link{edge_type, from_vertex,
|
||||
edge_ptr};
|
||||
auto it =
|
||||
std::find(to_vertex->in_edges.begin(), to_vertex->in_edges.end(), link);
|
||||
CHECK(it != to_vertex->in_edges.end()) << "Invalid database state!";
|
||||
std::swap(*it, *to_vertex->in_edges.rbegin());
|
||||
to_vertex->in_edges.pop_back();
|
||||
}
|
||||
|
||||
return Result<bool>{true};
|
||||
}
|
||||
|
||||
void Storage::Accessor::AdvanceCommand() { ++transaction_->command_id; }
|
||||
|
||||
void Storage::Accessor::Commit() {
|
||||
@ -153,6 +344,48 @@ void Storage::Accessor::Abort() {
|
||||
}
|
||||
break;
|
||||
}
|
||||
case Delta::Action::ADD_IN_EDGE: {
|
||||
std::tuple<uint64_t, Vertex *, Edge *> link{
|
||||
current->vertex_edge.edge_type, current->vertex_edge.vertex,
|
||||
current->vertex_edge.edge};
|
||||
auto it =
|
||||
std::find(vertex->in_edges.begin(), vertex->in_edges.end(), link);
|
||||
CHECK(it == vertex->in_edges.end()) << "Invalid database state!";
|
||||
vertex->in_edges.push_back(link);
|
||||
break;
|
||||
}
|
||||
case Delta::Action::ADD_OUT_EDGE: {
|
||||
std::tuple<uint64_t, Vertex *, Edge *> link{
|
||||
current->vertex_edge.edge_type, current->vertex_edge.vertex,
|
||||
current->vertex_edge.edge};
|
||||
auto it = std::find(vertex->out_edges.begin(),
|
||||
vertex->out_edges.end(), link);
|
||||
CHECK(it == vertex->out_edges.end()) << "Invalid database state!";
|
||||
vertex->out_edges.push_back(link);
|
||||
break;
|
||||
}
|
||||
case Delta::Action::REMOVE_IN_EDGE: {
|
||||
std::tuple<uint64_t, Vertex *, Edge *> link{
|
||||
current->vertex_edge.edge_type, current->vertex_edge.vertex,
|
||||
current->vertex_edge.edge};
|
||||
auto it =
|
||||
std::find(vertex->in_edges.begin(), vertex->in_edges.end(), link);
|
||||
CHECK(it != vertex->in_edges.end()) << "Invalid database state!";
|
||||
std::swap(*it, *vertex->in_edges.rbegin());
|
||||
vertex->in_edges.pop_back();
|
||||
break;
|
||||
}
|
||||
case Delta::Action::REMOVE_OUT_EDGE: {
|
||||
std::tuple<uint64_t, Vertex *, Edge *> link{
|
||||
current->vertex_edge.edge_type, current->vertex_edge.vertex,
|
||||
current->vertex_edge.edge};
|
||||
auto it = std::find(vertex->out_edges.begin(),
|
||||
vertex->out_edges.end(), link);
|
||||
CHECK(it != vertex->out_edges.end()) << "Invalid database state!";
|
||||
std::swap(*it, *vertex->out_edges.rbegin());
|
||||
vertex->out_edges.pop_back();
|
||||
break;
|
||||
}
|
||||
case Delta::Action::DELETE_OBJECT: {
|
||||
auto acc = storage_->vertices_.access();
|
||||
CHECK(acc.remove(vertex->gid)) << "Invalid database state!";
|
||||
@ -167,6 +400,37 @@ void Storage::Accessor::Abort() {
|
||||
}
|
||||
vertex->delta = current;
|
||||
}
|
||||
for (Edge *edge : transaction_->modified_edges) {
|
||||
std::lock_guard<utils::SpinLock> guard(edge->lock);
|
||||
Delta *current = edge->delta;
|
||||
while (current != nullptr &&
|
||||
current->timestamp->load(std::memory_order_acquire) ==
|
||||
transaction_->transaction_id) {
|
||||
switch (current->action) {
|
||||
case Delta::Action::DELETE_OBJECT: {
|
||||
auto acc = storage_->edges_.access();
|
||||
CHECK(acc.remove(edge->gid)) << "Invalid database state!";
|
||||
break;
|
||||
}
|
||||
case Delta::Action::RECREATE_OBJECT: {
|
||||
edge->deleted = false;
|
||||
break;
|
||||
}
|
||||
case Delta::Action::REMOVE_LABEL:
|
||||
case Delta::Action::ADD_LABEL:
|
||||
case Delta::Action::SET_PROPERTY:
|
||||
case Delta::Action::ADD_IN_EDGE:
|
||||
case Delta::Action::ADD_OUT_EDGE:
|
||||
case Delta::Action::REMOVE_IN_EDGE:
|
||||
case Delta::Action::REMOVE_OUT_EDGE: {
|
||||
LOG(FATAL) << "Invalid database state!";
|
||||
break;
|
||||
}
|
||||
}
|
||||
current = current->next.load(std::memory_order_acquire);
|
||||
}
|
||||
edge->delta = current;
|
||||
}
|
||||
transaction_->is_active = false;
|
||||
}
|
||||
|
||||
|
@ -4,6 +4,9 @@
|
||||
|
||||
#include "utils/skip_list.hpp"
|
||||
|
||||
#include "storage/v2/edge.hpp"
|
||||
#include "storage/v2/edge_accessor.hpp"
|
||||
#include "storage/v2/mvcc.hpp"
|
||||
#include "storage/v2/result.hpp"
|
||||
#include "storage/v2/transaction.hpp"
|
||||
#include "storage/v2/vertex.hpp"
|
||||
@ -42,6 +45,13 @@ class Storage final {
|
||||
|
||||
Result<bool> DeleteVertex(VertexAccessor *vertex);
|
||||
|
||||
Result<bool> DetachDeleteVertex(VertexAccessor *vertex);
|
||||
|
||||
Result<EdgeAccessor> CreateEdge(VertexAccessor *from, VertexAccessor *to,
|
||||
uint64_t edge_type);
|
||||
|
||||
Result<bool> DeleteEdge(EdgeAccessor *edge);
|
||||
|
||||
void AdvanceCommand();
|
||||
|
||||
void Commit();
|
||||
@ -59,7 +69,9 @@ class Storage final {
|
||||
private:
|
||||
// Main object storage
|
||||
utils::SkipList<storage::Vertex> vertices_;
|
||||
utils::SkipList<storage::Edge> edges_;
|
||||
std::atomic<uint64_t> vertex_id_{0};
|
||||
std::atomic<uint64_t> edge_id_{0};
|
||||
|
||||
// Transaction engine
|
||||
utils::SpinLock lock_;
|
||||
|
@ -7,6 +7,7 @@
|
||||
#include "utils/skip_list.hpp"
|
||||
|
||||
#include "storage/v2/delta.hpp"
|
||||
#include "storage/v2/edge.hpp"
|
||||
#include "storage/v2/property_value.hpp"
|
||||
#include "storage/v2/vertex.hpp"
|
||||
#include "storage/v2/view.hpp"
|
||||
@ -44,6 +45,7 @@ struct Transaction {
|
||||
uint64_t command_id;
|
||||
std::list<Delta> deltas;
|
||||
std::list<Vertex *> modified_vertices;
|
||||
std::list<Edge *> modified_edges;
|
||||
bool is_active;
|
||||
bool must_abort;
|
||||
};
|
||||
|
@ -1,12 +1,14 @@
|
||||
#pragma once
|
||||
|
||||
#include <limits>
|
||||
#include <tuple>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
|
||||
#include "utils/spin_lock.hpp"
|
||||
|
||||
#include "storage/v2/delta.hpp"
|
||||
#include "storage/v2/edge.hpp"
|
||||
#include "storage/v2/gid.hpp"
|
||||
|
||||
namespace storage {
|
||||
@ -18,9 +20,13 @@ struct Vertex {
|
||||
}
|
||||
|
||||
Gid gid;
|
||||
|
||||
std::vector<uint64_t> labels;
|
||||
std::unordered_map<uint64_t, storage::PropertyValue> properties;
|
||||
|
||||
std::vector<std::tuple<uint64_t, Vertex *, Edge *>> in_edges;
|
||||
std::vector<std::tuple<uint64_t, Vertex *, Edge *>> out_edges;
|
||||
|
||||
utils::SpinLock lock;
|
||||
bool deleted;
|
||||
// uint8_t PAD;
|
||||
|
@ -2,6 +2,7 @@
|
||||
|
||||
#include <memory>
|
||||
|
||||
#include "storage/v2/edge_accessor.hpp"
|
||||
#include "storage/v2/mvcc.hpp"
|
||||
|
||||
namespace storage {
|
||||
@ -22,6 +23,10 @@ std::optional<VertexAccessor> VertexAccessor::Create(Vertex *vertex,
|
||||
case Delta::Action::ADD_LABEL:
|
||||
case Delta::Action::REMOVE_LABEL:
|
||||
case Delta::Action::SET_PROPERTY:
|
||||
case Delta::Action::ADD_IN_EDGE:
|
||||
case Delta::Action::ADD_OUT_EDGE:
|
||||
case Delta::Action::REMOVE_IN_EDGE:
|
||||
case Delta::Action::REMOVE_OUT_EDGE:
|
||||
break;
|
||||
case Delta::Action::RECREATE_OBJECT: {
|
||||
is_visible = true;
|
||||
@ -110,6 +115,10 @@ Result<bool> VertexAccessor::HasLabel(uint64_t label, View view) {
|
||||
break;
|
||||
}
|
||||
case Delta::Action::SET_PROPERTY:
|
||||
case Delta::Action::ADD_IN_EDGE:
|
||||
case Delta::Action::ADD_OUT_EDGE:
|
||||
case Delta::Action::REMOVE_IN_EDGE:
|
||||
case Delta::Action::REMOVE_OUT_EDGE:
|
||||
break;
|
||||
}
|
||||
});
|
||||
@ -154,6 +163,10 @@ Result<std::vector<uint64_t>> VertexAccessor::Labels(View view) {
|
||||
break;
|
||||
}
|
||||
case Delta::Action::SET_PROPERTY:
|
||||
case Delta::Action::ADD_IN_EDGE:
|
||||
case Delta::Action::ADD_OUT_EDGE:
|
||||
case Delta::Action::REMOVE_IN_EDGE:
|
||||
case Delta::Action::REMOVE_OUT_EDGE:
|
||||
break;
|
||||
}
|
||||
});
|
||||
@ -226,6 +239,10 @@ Result<PropertyValue> VertexAccessor::GetProperty(uint64_t property,
|
||||
}
|
||||
case Delta::Action::ADD_LABEL:
|
||||
case Delta::Action::REMOVE_LABEL:
|
||||
case Delta::Action::ADD_IN_EDGE:
|
||||
case Delta::Action::ADD_OUT_EDGE:
|
||||
case Delta::Action::REMOVE_IN_EDGE:
|
||||
case Delta::Action::REMOVE_OUT_EDGE:
|
||||
break;
|
||||
}
|
||||
});
|
||||
@ -272,6 +289,10 @@ Result<std::unordered_map<uint64_t, PropertyValue>> VertexAccessor::Properties(
|
||||
}
|
||||
case Delta::Action::ADD_LABEL:
|
||||
case Delta::Action::REMOVE_LABEL:
|
||||
case Delta::Action::ADD_IN_EDGE:
|
||||
case Delta::Action::ADD_OUT_EDGE:
|
||||
case Delta::Action::REMOVE_IN_EDGE:
|
||||
case Delta::Action::REMOVE_OUT_EDGE:
|
||||
break;
|
||||
}
|
||||
});
|
||||
@ -283,4 +304,148 @@ Result<std::unordered_map<uint64_t, PropertyValue>> VertexAccessor::Properties(
|
||||
std::move(properties)};
|
||||
}
|
||||
|
||||
Result<std::vector<std::tuple<uint64_t, VertexAccessor, EdgeAccessor>>>
|
||||
VertexAccessor::InEdges(const std::vector<uint64_t> &edge_types, View view) {
|
||||
std::vector<std::tuple<uint64_t, Vertex *, Edge *>> in_edges;
|
||||
bool deleted = false;
|
||||
Delta *delta = nullptr;
|
||||
{
|
||||
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
|
||||
deleted = vertex_->deleted;
|
||||
in_edges = vertex_->in_edges;
|
||||
delta = vertex_->delta;
|
||||
}
|
||||
ApplyDeltasForRead(
|
||||
transaction_, delta, view, [&deleted, &in_edges](const Delta &delta) {
|
||||
switch (delta.action) {
|
||||
case Delta::Action::ADD_IN_EDGE: {
|
||||
// Add the edge because we don't see the removal.
|
||||
std::tuple<uint64_t, Vertex *, Edge *> link{
|
||||
delta.vertex_edge.edge_type, delta.vertex_edge.vertex,
|
||||
delta.vertex_edge.edge};
|
||||
auto it = std::find(in_edges.begin(), in_edges.end(), link);
|
||||
CHECK(it == in_edges.end()) << "Invalid database state!";
|
||||
in_edges.push_back(link);
|
||||
break;
|
||||
}
|
||||
case Delta::Action::REMOVE_IN_EDGE: {
|
||||
// Remove the label because we don't see the addition.
|
||||
std::tuple<uint64_t, Vertex *, Edge *> link{
|
||||
delta.vertex_edge.edge_type, delta.vertex_edge.vertex,
|
||||
delta.vertex_edge.edge};
|
||||
auto it = std::find(in_edges.begin(), in_edges.end(), link);
|
||||
CHECK(it != in_edges.end()) << "Invalid database state!";
|
||||
std::swap(*it, *in_edges.rbegin());
|
||||
in_edges.pop_back();
|
||||
break;
|
||||
}
|
||||
case Delta::Action::DELETE_OBJECT: {
|
||||
LOG(FATAL) << "Invalid accessor!";
|
||||
break;
|
||||
}
|
||||
case Delta::Action::RECREATE_OBJECT: {
|
||||
deleted = false;
|
||||
break;
|
||||
}
|
||||
case Delta::Action::ADD_LABEL:
|
||||
case Delta::Action::REMOVE_LABEL:
|
||||
case Delta::Action::SET_PROPERTY:
|
||||
case Delta::Action::ADD_OUT_EDGE:
|
||||
case Delta::Action::REMOVE_OUT_EDGE:
|
||||
break;
|
||||
}
|
||||
});
|
||||
if (deleted) {
|
||||
return Result<
|
||||
std::vector<std::tuple<uint64_t, VertexAccessor, EdgeAccessor>>>{
|
||||
Error::DELETED_OBJECT};
|
||||
}
|
||||
std::vector<std::tuple<uint64_t, VertexAccessor, EdgeAccessor>> ret;
|
||||
ret.reserve(in_edges.size());
|
||||
for (const auto &item : in_edges) {
|
||||
auto [edge_type, from_vertex, edge] = item;
|
||||
if (edge_types.empty() || std::find(edge_types.begin(), edge_types.end(),
|
||||
edge_type) != edge_types.end()) {
|
||||
ret.emplace_back(
|
||||
edge_type, VertexAccessor{from_vertex, transaction_},
|
||||
EdgeAccessor{edge, edge_type, from_vertex, vertex_, transaction_});
|
||||
}
|
||||
}
|
||||
return Result<
|
||||
std::vector<std::tuple<uint64_t, VertexAccessor, EdgeAccessor>>>{
|
||||
std::move(ret)};
|
||||
}
|
||||
|
||||
Result<std::vector<std::tuple<uint64_t, VertexAccessor, EdgeAccessor>>>
|
||||
VertexAccessor::OutEdges(const std::vector<uint64_t> &edge_types, View view) {
|
||||
std::vector<std::tuple<uint64_t, Vertex *, Edge *>> out_edges;
|
||||
bool deleted = false;
|
||||
Delta *delta = nullptr;
|
||||
{
|
||||
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
|
||||
deleted = vertex_->deleted;
|
||||
out_edges = vertex_->out_edges;
|
||||
delta = vertex_->delta;
|
||||
}
|
||||
ApplyDeltasForRead(
|
||||
transaction_, delta, view, [&deleted, &out_edges](const Delta &delta) {
|
||||
switch (delta.action) {
|
||||
case Delta::Action::ADD_OUT_EDGE: {
|
||||
// Add the edge because we don't see the removal.
|
||||
std::tuple<uint64_t, Vertex *, Edge *> link{
|
||||
delta.vertex_edge.edge_type, delta.vertex_edge.vertex,
|
||||
delta.vertex_edge.edge};
|
||||
auto it = std::find(out_edges.begin(), out_edges.end(), link);
|
||||
CHECK(it == out_edges.end()) << "Invalid database state!";
|
||||
out_edges.push_back(link);
|
||||
break;
|
||||
}
|
||||
case Delta::Action::REMOVE_OUT_EDGE: {
|
||||
// Remove the label because we don't see the addition.
|
||||
std::tuple<uint64_t, Vertex *, Edge *> link{
|
||||
delta.vertex_edge.edge_type, delta.vertex_edge.vertex,
|
||||
delta.vertex_edge.edge};
|
||||
auto it = std::find(out_edges.begin(), out_edges.end(), link);
|
||||
CHECK(it != out_edges.end()) << "Invalid database state!";
|
||||
std::swap(*it, *out_edges.rbegin());
|
||||
out_edges.pop_back();
|
||||
break;
|
||||
}
|
||||
case Delta::Action::DELETE_OBJECT: {
|
||||
LOG(FATAL) << "Invalid accessor!";
|
||||
break;
|
||||
}
|
||||
case Delta::Action::RECREATE_OBJECT: {
|
||||
deleted = false;
|
||||
break;
|
||||
}
|
||||
case Delta::Action::ADD_LABEL:
|
||||
case Delta::Action::REMOVE_LABEL:
|
||||
case Delta::Action::SET_PROPERTY:
|
||||
case Delta::Action::ADD_IN_EDGE:
|
||||
case Delta::Action::REMOVE_IN_EDGE:
|
||||
break;
|
||||
}
|
||||
});
|
||||
if (deleted) {
|
||||
return Result<
|
||||
std::vector<std::tuple<uint64_t, VertexAccessor, EdgeAccessor>>>{
|
||||
Error::DELETED_OBJECT};
|
||||
}
|
||||
std::vector<std::tuple<uint64_t, VertexAccessor, EdgeAccessor>> ret;
|
||||
ret.reserve(out_edges.size());
|
||||
for (const auto &item : out_edges) {
|
||||
auto [edge_type, to_vertex, edge] = item;
|
||||
if (edge_types.empty() || std::find(edge_types.begin(), edge_types.end(),
|
||||
edge_type) != edge_types.end()) {
|
||||
ret.emplace_back(
|
||||
edge_type, VertexAccessor{to_vertex, transaction_},
|
||||
EdgeAccessor{edge, edge_type, vertex_, to_vertex, transaction_});
|
||||
}
|
||||
}
|
||||
return Result<
|
||||
std::vector<std::tuple<uint64_t, VertexAccessor, EdgeAccessor>>>{
|
||||
std::move(ret)};
|
||||
}
|
||||
|
||||
} // namespace storage
|
||||
|
@ -10,16 +10,17 @@
|
||||
|
||||
namespace storage {
|
||||
|
||||
class EdgeAccessor;
|
||||
class Storage;
|
||||
|
||||
class VertexAccessor final {
|
||||
private:
|
||||
friend class Storage;
|
||||
|
||||
public:
|
||||
VertexAccessor(Vertex *vertex, Transaction *transaction)
|
||||
: vertex_(vertex), transaction_(transaction) {}
|
||||
|
||||
public:
|
||||
static std::optional<VertexAccessor> Create(Vertex *vertex,
|
||||
Transaction *transaction,
|
||||
View view);
|
||||
@ -38,8 +39,21 @@ class VertexAccessor final {
|
||||
|
||||
Result<std::unordered_map<uint64_t, PropertyValue>> Properties(View view);
|
||||
|
||||
Result<std::vector<std::tuple<uint64_t, VertexAccessor, EdgeAccessor>>>
|
||||
InEdges(const std::vector<uint64_t> &edge_types, View view);
|
||||
|
||||
Result<std::vector<std::tuple<uint64_t, VertexAccessor, EdgeAccessor>>>
|
||||
OutEdges(const std::vector<uint64_t> &edge_types, View view);
|
||||
|
||||
Gid Gid() const { return vertex_->gid; }
|
||||
|
||||
bool operator==(const VertexAccessor &other) const {
|
||||
return vertex_ == other.vertex_ && transaction_ == other.transaction_;
|
||||
}
|
||||
bool operator!=(const VertexAccessor &other) const {
|
||||
return !(*this == other);
|
||||
}
|
||||
|
||||
private:
|
||||
Vertex *vertex_;
|
||||
Transaction *transaction_;
|
||||
|
@ -382,6 +382,9 @@ target_link_libraries(${test_prefix}auth mg-auth kvstore_lib)
|
||||
add_unit_test(property_value_v2.cpp)
|
||||
target_link_libraries(${test_prefix}property_value_v2 mg-utils)
|
||||
|
||||
add_unit_test(storage_v2_edge.cpp)
|
||||
target_link_libraries(${test_prefix}storage_v2_edge mg-storage-v2)
|
||||
|
||||
add_unit_test(storage_v2.cpp)
|
||||
target_link_libraries(${test_prefix}storage_v2 mg-storage-v2)
|
||||
|
||||
|
4624
tests/unit/storage_v2_edge.cpp
Normal file
4624
tests/unit/storage_v2_edge.cpp
Normal file
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue
Block a user