Implement multiple previous pointer in storage v2
Reviewers: mtomic, teon.banek Reviewed By: mtomic Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D2190
This commit is contained in:
parent
5bc0be50c5
commit
4d94aa740a
@ -2,6 +2,8 @@
|
||||
|
||||
#include <atomic>
|
||||
|
||||
#include <glog/logging.h>
|
||||
|
||||
#include "storage/v2/property_value.hpp"
|
||||
|
||||
namespace storage {
|
||||
@ -9,6 +11,80 @@ namespace storage {
|
||||
// Forward declarations because we only store pointers here.
|
||||
struct Vertex;
|
||||
struct Edge;
|
||||
struct Delta;
|
||||
|
||||
// This class stores one of three pointers (`Delta`, `Vertex` and `Edge`)
|
||||
// without using additional memory for storing the type. The type is stored in
|
||||
// the pointer itself in the lower bits. All of those structures contain large
|
||||
// items in themselves (e.g. `uint64_t`) that require the pointer to be aligned
|
||||
// to their size (for `uint64_t` it is 8). That means that the pointer will
|
||||
// always be a multiple of 8 which implies that the lower 3 bits of the pointer
|
||||
// will always be 0. We can use those 3 bits to store information about the type
|
||||
// of the pointer stored (2 bits).
|
||||
class PreviousPtr {
|
||||
private:
|
||||
static constexpr uintptr_t kDelta = 0b01UL;
|
||||
static constexpr uintptr_t kVertex = 0b10UL;
|
||||
static constexpr uintptr_t kEdge = 0b11UL;
|
||||
|
||||
static constexpr uintptr_t kMask = 0b11UL;
|
||||
|
||||
public:
|
||||
enum class Type {
|
||||
DELTA,
|
||||
VERTEX,
|
||||
EDGE,
|
||||
};
|
||||
|
||||
void Set(Delta *delta) {
|
||||
uintptr_t value = reinterpret_cast<uintptr_t>(delta);
|
||||
CHECK((value & kMask) == 0) << "Invalid pointer!";
|
||||
storage_ = value | kDelta;
|
||||
}
|
||||
|
||||
void Set(Vertex *vertex) {
|
||||
uintptr_t value = reinterpret_cast<uintptr_t>(vertex);
|
||||
CHECK((value & kMask) == 0) << "Invalid pointer!";
|
||||
storage_ = value | kVertex;
|
||||
}
|
||||
|
||||
void Set(Edge *edge) {
|
||||
uintptr_t value = reinterpret_cast<uintptr_t>(edge);
|
||||
CHECK((value & kMask) == 0) << "Invalid pointer!";
|
||||
storage_ = value | kEdge;
|
||||
}
|
||||
|
||||
Type GetType() const {
|
||||
uintptr_t type = storage_ & kMask;
|
||||
if (type == kDelta) {
|
||||
return Type::DELTA;
|
||||
} else if (type == kVertex) {
|
||||
return Type::VERTEX;
|
||||
} else if (type == kEdge) {
|
||||
return Type::EDGE;
|
||||
} else {
|
||||
LOG(FATAL) << "Invalid pointer type!";
|
||||
}
|
||||
}
|
||||
|
||||
Delta *GetDelta() const {
|
||||
CHECK((storage_ & kMask) == kDelta) << "Can't convert pointer to delta!";
|
||||
return reinterpret_cast<Delta *>(storage_ & ~kMask);
|
||||
}
|
||||
|
||||
Vertex *GetVertex() const {
|
||||
CHECK((storage_ & kMask) == kVertex) << "Can't convert pointer to vertex!";
|
||||
return reinterpret_cast<Vertex *>(storage_ & ~kMask);
|
||||
}
|
||||
|
||||
Edge *GetEdge() const {
|
||||
CHECK((storage_ & kMask) == kEdge) << "Can't convert pointer to edge!";
|
||||
return reinterpret_cast<Edge *>(storage_ & ~kMask);
|
||||
}
|
||||
|
||||
private:
|
||||
uintptr_t storage_{0};
|
||||
};
|
||||
|
||||
struct Delta {
|
||||
enum class Action {
|
||||
@ -141,7 +217,7 @@ struct Delta {
|
||||
// TODO: optimize with in-place copy
|
||||
std::atomic<uint64_t> *timestamp;
|
||||
uint64_t command_id;
|
||||
Delta *prev{nullptr};
|
||||
PreviousPtr prev;
|
||||
std::atomic<Delta *> next{nullptr};
|
||||
|
||||
union {
|
||||
@ -176,4 +252,7 @@ struct Delta {
|
||||
}
|
||||
};
|
||||
|
||||
static_assert(alignof(Delta) >= 8,
|
||||
"The Delta should be aligned to at least 8!");
|
||||
|
||||
} // namespace storage
|
||||
|
@ -30,6 +30,8 @@ struct Edge {
|
||||
Delta *delta;
|
||||
};
|
||||
|
||||
static_assert(alignof(Edge) >= 8, "The Edge should be aligned to at least 8!");
|
||||
|
||||
inline bool operator==(const Edge &first, const Edge &second) {
|
||||
return first.gid == second.gid;
|
||||
}
|
||||
|
@ -1,12 +1,8 @@
|
||||
#pragma once
|
||||
|
||||
#include <type_traits>
|
||||
|
||||
#include "storage/v2/delta.hpp"
|
||||
#include "storage/v2/edge.hpp"
|
||||
#include "storage/v2/property_value.hpp"
|
||||
#include "storage/v2/transaction.hpp"
|
||||
#include "storage/v2/vertex.hpp"
|
||||
#include "storage/v2/view.hpp"
|
||||
|
||||
namespace storage {
|
||||
@ -80,8 +76,7 @@ inline Delta *CreateDeleteObjectDelta(Transaction *transaction) {
|
||||
}
|
||||
|
||||
/// This function creates a delta in the transaction for the object and links
|
||||
/// the delta into the object's delta list. It also adds the object to the
|
||||
/// transaction's modified objects list.
|
||||
/// the delta into the object's delta list.
|
||||
template <typename TObj, class... Args>
|
||||
inline void CreateAndLinkDelta(Transaction *transaction, TObj *object,
|
||||
Args &&... args) {
|
||||
@ -89,24 +84,14 @@ inline void CreateAndLinkDelta(Transaction *transaction, TObj *object,
|
||||
&transaction->commit_timestamp,
|
||||
transaction->command_id);
|
||||
|
||||
// The operations are written in such order so that both `next` and `prev`
|
||||
// chains are valid at all times.
|
||||
delta->prev.Set(object);
|
||||
if (object->delta) {
|
||||
object->delta->prev = delta;
|
||||
object->delta->prev.Set(delta);
|
||||
}
|
||||
delta->next.store(object->delta, std::memory_order_release);
|
||||
object->delta = delta;
|
||||
|
||||
if constexpr (std::is_same_v<TObj, Vertex>) {
|
||||
if (transaction->modified_vertices.empty() ||
|
||||
transaction->modified_vertices.back() != object) {
|
||||
transaction->modified_vertices.push_back(object);
|
||||
}
|
||||
}
|
||||
if constexpr (std::is_same_v<TObj, Edge>) {
|
||||
if (transaction->modified_edges.empty() ||
|
||||
transaction->modified_edges.back() != object) {
|
||||
transaction->modified_edges.push_back(object);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace storage
|
||||
|
@ -63,7 +63,7 @@ VertexAccessor Storage::Accessor::CreateVertex() {
|
||||
auto [it, inserted] = acc.insert(Vertex{storage::Gid::FromUint(gid), delta});
|
||||
CHECK(inserted) << "The vertex must be inserted here!";
|
||||
CHECK(it != acc.end()) << "Invalid Vertex accessor!";
|
||||
transaction_->modified_vertices.push_back(&*it);
|
||||
delta->prev.Set(&*it);
|
||||
return VertexAccessor{&*it, transaction_};
|
||||
}
|
||||
|
||||
@ -201,7 +201,7 @@ Result<EdgeAccessor> Storage::Accessor::CreateEdge(VertexAccessor *from,
|
||||
CHECK(inserted) << "The edge must be inserted here!";
|
||||
CHECK(it != acc.end()) << "Invalid Edge accessor!";
|
||||
auto edge = &*it;
|
||||
transaction_->modified_edges.push_back(edge);
|
||||
delta->prev.Set(&*it);
|
||||
|
||||
CreateAndLinkDelta(transaction_, from_vertex, Delta::RemoveOutEdgeTag(),
|
||||
edge_type, to_vertex, edge);
|
||||
@ -306,145 +306,163 @@ void Storage::Accessor::Commit() {
|
||||
|
||||
void Storage::Accessor::Abort() {
|
||||
CHECK(transaction_->is_active) << "The transaction is already terminated!";
|
||||
for (Vertex *vertex : transaction_->modified_vertices) {
|
||||
std::lock_guard<utils::SpinLock> guard(vertex->lock);
|
||||
Delta *current = vertex->delta;
|
||||
while (current != nullptr &&
|
||||
current->timestamp->load(std::memory_order_acquire) ==
|
||||
transaction_->transaction_id) {
|
||||
switch (current->action) {
|
||||
case Delta::Action::REMOVE_LABEL: {
|
||||
auto it = std::find(vertex->labels.begin(), vertex->labels.end(),
|
||||
current->label);
|
||||
CHECK(it != vertex->labels.end()) << "Invalid database state!";
|
||||
std::swap(*it, *vertex->labels.rbegin());
|
||||
vertex->labels.pop_back();
|
||||
break;
|
||||
}
|
||||
case Delta::Action::ADD_LABEL: {
|
||||
auto it = std::find(vertex->labels.begin(), vertex->labels.end(),
|
||||
current->label);
|
||||
CHECK(it == vertex->labels.end()) << "Invalid database state!";
|
||||
vertex->labels.push_back(current->label);
|
||||
break;
|
||||
}
|
||||
case Delta::Action::SET_PROPERTY: {
|
||||
auto it = vertex->properties.find(current->property.key);
|
||||
if (it != vertex->properties.end()) {
|
||||
if (current->property.value.IsNull()) {
|
||||
// remove the property
|
||||
vertex->properties.erase(it);
|
||||
} else {
|
||||
// set the value
|
||||
it->second = current->property.value;
|
||||
for (const auto &delta : transaction_->deltas) {
|
||||
switch (delta.prev.GetType()) {
|
||||
case PreviousPtr::Type::VERTEX: {
|
||||
auto vertex = delta.prev.GetVertex();
|
||||
std::lock_guard<utils::SpinLock> guard(vertex->lock);
|
||||
Delta *current = vertex->delta;
|
||||
while (current != nullptr &&
|
||||
current->timestamp->load(std::memory_order_acquire) ==
|
||||
transaction_->transaction_id) {
|
||||
switch (current->action) {
|
||||
case Delta::Action::REMOVE_LABEL: {
|
||||
auto it = std::find(vertex->labels.begin(), vertex->labels.end(),
|
||||
current->label);
|
||||
CHECK(it != vertex->labels.end()) << "Invalid database state!";
|
||||
std::swap(*it, *vertex->labels.rbegin());
|
||||
vertex->labels.pop_back();
|
||||
break;
|
||||
}
|
||||
} else if (!current->property.value.IsNull()) {
|
||||
vertex->properties.emplace(current->property.key,
|
||||
current->property.value);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case Delta::Action::ADD_IN_EDGE: {
|
||||
std::tuple<uint64_t, Vertex *, Edge *> link{
|
||||
current->vertex_edge.edge_type, current->vertex_edge.vertex,
|
||||
current->vertex_edge.edge};
|
||||
auto it =
|
||||
std::find(vertex->in_edges.begin(), vertex->in_edges.end(), link);
|
||||
CHECK(it == vertex->in_edges.end()) << "Invalid database state!";
|
||||
vertex->in_edges.push_back(link);
|
||||
break;
|
||||
}
|
||||
case Delta::Action::ADD_OUT_EDGE: {
|
||||
std::tuple<uint64_t, Vertex *, Edge *> link{
|
||||
current->vertex_edge.edge_type, current->vertex_edge.vertex,
|
||||
current->vertex_edge.edge};
|
||||
auto it = std::find(vertex->out_edges.begin(),
|
||||
vertex->out_edges.end(), link);
|
||||
CHECK(it == vertex->out_edges.end()) << "Invalid database state!";
|
||||
vertex->out_edges.push_back(link);
|
||||
break;
|
||||
}
|
||||
case Delta::Action::REMOVE_IN_EDGE: {
|
||||
std::tuple<uint64_t, Vertex *, Edge *> link{
|
||||
current->vertex_edge.edge_type, current->vertex_edge.vertex,
|
||||
current->vertex_edge.edge};
|
||||
auto it =
|
||||
std::find(vertex->in_edges.begin(), vertex->in_edges.end(), link);
|
||||
CHECK(it != vertex->in_edges.end()) << "Invalid database state!";
|
||||
std::swap(*it, *vertex->in_edges.rbegin());
|
||||
vertex->in_edges.pop_back();
|
||||
break;
|
||||
}
|
||||
case Delta::Action::REMOVE_OUT_EDGE: {
|
||||
std::tuple<uint64_t, Vertex *, Edge *> link{
|
||||
current->vertex_edge.edge_type, current->vertex_edge.vertex,
|
||||
current->vertex_edge.edge};
|
||||
auto it = std::find(vertex->out_edges.begin(),
|
||||
vertex->out_edges.end(), link);
|
||||
CHECK(it != vertex->out_edges.end()) << "Invalid database state!";
|
||||
std::swap(*it, *vertex->out_edges.rbegin());
|
||||
vertex->out_edges.pop_back();
|
||||
break;
|
||||
}
|
||||
case Delta::Action::DELETE_OBJECT: {
|
||||
auto acc = storage_->vertices_.access();
|
||||
CHECK(acc.remove(vertex->gid)) << "Invalid database state!";
|
||||
break;
|
||||
}
|
||||
case Delta::Action::RECREATE_OBJECT: {
|
||||
vertex->deleted = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
current = current->next.load(std::memory_order_acquire);
|
||||
}
|
||||
vertex->delta = current;
|
||||
}
|
||||
for (Edge *edge : transaction_->modified_edges) {
|
||||
std::lock_guard<utils::SpinLock> guard(edge->lock);
|
||||
Delta *current = edge->delta;
|
||||
while (current != nullptr &&
|
||||
current->timestamp->load(std::memory_order_acquire) ==
|
||||
transaction_->transaction_id) {
|
||||
switch (current->action) {
|
||||
case Delta::Action::SET_PROPERTY: {
|
||||
auto it = edge->properties.find(current->property.key);
|
||||
if (it != edge->properties.end()) {
|
||||
if (current->property.value.IsNull()) {
|
||||
// remove the property
|
||||
edge->properties.erase(it);
|
||||
} else {
|
||||
// set the value
|
||||
it->second = current->property.value;
|
||||
case Delta::Action::ADD_LABEL: {
|
||||
auto it = std::find(vertex->labels.begin(), vertex->labels.end(),
|
||||
current->label);
|
||||
CHECK(it == vertex->labels.end()) << "Invalid database state!";
|
||||
vertex->labels.push_back(current->label);
|
||||
break;
|
||||
}
|
||||
case Delta::Action::SET_PROPERTY: {
|
||||
auto it = vertex->properties.find(current->property.key);
|
||||
if (it != vertex->properties.end()) {
|
||||
if (current->property.value.IsNull()) {
|
||||
// remove the property
|
||||
vertex->properties.erase(it);
|
||||
} else {
|
||||
// set the value
|
||||
it->second = current->property.value;
|
||||
}
|
||||
} else if (!current->property.value.IsNull()) {
|
||||
vertex->properties.emplace(current->property.key,
|
||||
current->property.value);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case Delta::Action::ADD_IN_EDGE: {
|
||||
std::tuple<uint64_t, Vertex *, Edge *> link{
|
||||
current->vertex_edge.edge_type, current->vertex_edge.vertex,
|
||||
current->vertex_edge.edge};
|
||||
auto it = std::find(vertex->in_edges.begin(),
|
||||
vertex->in_edges.end(), link);
|
||||
CHECK(it == vertex->in_edges.end()) << "Invalid database state!";
|
||||
vertex->in_edges.push_back(link);
|
||||
break;
|
||||
}
|
||||
case Delta::Action::ADD_OUT_EDGE: {
|
||||
std::tuple<uint64_t, Vertex *, Edge *> link{
|
||||
current->vertex_edge.edge_type, current->vertex_edge.vertex,
|
||||
current->vertex_edge.edge};
|
||||
auto it = std::find(vertex->out_edges.begin(),
|
||||
vertex->out_edges.end(), link);
|
||||
CHECK(it == vertex->out_edges.end()) << "Invalid database state!";
|
||||
vertex->out_edges.push_back(link);
|
||||
break;
|
||||
}
|
||||
case Delta::Action::REMOVE_IN_EDGE: {
|
||||
std::tuple<uint64_t, Vertex *, Edge *> link{
|
||||
current->vertex_edge.edge_type, current->vertex_edge.vertex,
|
||||
current->vertex_edge.edge};
|
||||
auto it = std::find(vertex->in_edges.begin(),
|
||||
vertex->in_edges.end(), link);
|
||||
CHECK(it != vertex->in_edges.end()) << "Invalid database state!";
|
||||
std::swap(*it, *vertex->in_edges.rbegin());
|
||||
vertex->in_edges.pop_back();
|
||||
break;
|
||||
}
|
||||
case Delta::Action::REMOVE_OUT_EDGE: {
|
||||
std::tuple<uint64_t, Vertex *, Edge *> link{
|
||||
current->vertex_edge.edge_type, current->vertex_edge.vertex,
|
||||
current->vertex_edge.edge};
|
||||
auto it = std::find(vertex->out_edges.begin(),
|
||||
vertex->out_edges.end(), link);
|
||||
CHECK(it != vertex->out_edges.end()) << "Invalid database state!";
|
||||
std::swap(*it, *vertex->out_edges.rbegin());
|
||||
vertex->out_edges.pop_back();
|
||||
break;
|
||||
}
|
||||
case Delta::Action::DELETE_OBJECT: {
|
||||
auto acc = storage_->vertices_.access();
|
||||
CHECK(acc.remove(vertex->gid)) << "Invalid database state!";
|
||||
break;
|
||||
}
|
||||
case Delta::Action::RECREATE_OBJECT: {
|
||||
vertex->deleted = false;
|
||||
break;
|
||||
}
|
||||
} else if (!current->property.value.IsNull()) {
|
||||
edge->properties.emplace(current->property.key,
|
||||
current->property.value);
|
||||
}
|
||||
break;
|
||||
current = current->next.load(std::memory_order_acquire);
|
||||
}
|
||||
case Delta::Action::DELETE_OBJECT: {
|
||||
auto acc = storage_->edges_.access();
|
||||
CHECK(acc.remove(edge->gid)) << "Invalid database state!";
|
||||
break;
|
||||
}
|
||||
case Delta::Action::RECREATE_OBJECT: {
|
||||
edge->deleted = false;
|
||||
break;
|
||||
}
|
||||
case Delta::Action::REMOVE_LABEL:
|
||||
case Delta::Action::ADD_LABEL:
|
||||
case Delta::Action::ADD_IN_EDGE:
|
||||
case Delta::Action::ADD_OUT_EDGE:
|
||||
case Delta::Action::REMOVE_IN_EDGE:
|
||||
case Delta::Action::REMOVE_OUT_EDGE: {
|
||||
LOG(FATAL) << "Invalid database state!";
|
||||
break;
|
||||
vertex->delta = current;
|
||||
if (current != nullptr) {
|
||||
current->prev.Set(vertex);
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
current = current->next.load(std::memory_order_acquire);
|
||||
case PreviousPtr::Type::EDGE: {
|
||||
auto edge = delta.prev.GetEdge();
|
||||
std::lock_guard<utils::SpinLock> guard(edge->lock);
|
||||
Delta *current = edge->delta;
|
||||
while (current != nullptr &&
|
||||
current->timestamp->load(std::memory_order_acquire) ==
|
||||
transaction_->transaction_id) {
|
||||
switch (current->action) {
|
||||
case Delta::Action::SET_PROPERTY: {
|
||||
auto it = edge->properties.find(current->property.key);
|
||||
if (it != edge->properties.end()) {
|
||||
if (current->property.value.IsNull()) {
|
||||
// remove the property
|
||||
edge->properties.erase(it);
|
||||
} else {
|
||||
// set the value
|
||||
it->second = current->property.value;
|
||||
}
|
||||
} else if (!current->property.value.IsNull()) {
|
||||
edge->properties.emplace(current->property.key,
|
||||
current->property.value);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case Delta::Action::DELETE_OBJECT: {
|
||||
auto acc = storage_->edges_.access();
|
||||
CHECK(acc.remove(edge->gid)) << "Invalid database state!";
|
||||
break;
|
||||
}
|
||||
case Delta::Action::RECREATE_OBJECT: {
|
||||
edge->deleted = false;
|
||||
break;
|
||||
}
|
||||
case Delta::Action::REMOVE_LABEL:
|
||||
case Delta::Action::ADD_LABEL:
|
||||
case Delta::Action::ADD_IN_EDGE:
|
||||
case Delta::Action::ADD_OUT_EDGE:
|
||||
case Delta::Action::REMOVE_IN_EDGE:
|
||||
case Delta::Action::REMOVE_OUT_EDGE: {
|
||||
LOG(FATAL) << "Invalid database state!";
|
||||
break;
|
||||
}
|
||||
}
|
||||
current = current->next.load(std::memory_order_acquire);
|
||||
}
|
||||
edge->delta = current;
|
||||
if (current != nullptr) {
|
||||
current->prev.Set(edge);
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
case PreviousPtr::Type::DELTA:
|
||||
break;
|
||||
}
|
||||
edge->delta = current;
|
||||
}
|
||||
transaction_->is_active = false;
|
||||
}
|
||||
|
@ -29,7 +29,6 @@ struct Transaction {
|
||||
commit_timestamp(other.commit_timestamp.load()),
|
||||
command_id(other.command_id),
|
||||
deltas(std::move(other.deltas)),
|
||||
modified_vertices(std::move(other.modified_vertices)),
|
||||
is_active(other.is_active),
|
||||
must_abort(other.must_abort) {}
|
||||
|
||||
@ -44,8 +43,6 @@ struct Transaction {
|
||||
std::atomic<uint64_t> commit_timestamp;
|
||||
uint64_t command_id;
|
||||
std::list<Delta> deltas;
|
||||
std::list<Vertex *> modified_vertices;
|
||||
std::list<Edge *> modified_edges;
|
||||
bool is_active;
|
||||
bool must_abort;
|
||||
};
|
||||
|
@ -35,6 +35,9 @@ struct Vertex {
|
||||
Delta *delta;
|
||||
};
|
||||
|
||||
static_assert(alignof(Vertex) >= 8,
|
||||
"The Vertex should be aligned to at least 8!");
|
||||
|
||||
inline bool operator==(const Vertex &first, const Vertex &second) {
|
||||
return first.gid == second.gid;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user