Make Vertex|Edge lock member mutable
Reviewers: mferencevic, ipaljak Reviewed By: mferencevic Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D2614
This commit is contained in:
parent
2011aea165
commit
0c7f384fd2
@ -62,10 +62,11 @@ inline bool DropExistenceConstraint(Constraints *constraints, LabelId label,
|
||||
/// nullopt if all checks pass, and `ExistenceConstraintViolation` describing
|
||||
/// the violated constraint otherwise.
|
||||
[[nodiscard]] inline std::optional<ExistenceConstraintViolation>
|
||||
ValidateExistenceConstraints(Vertex *vertex, Constraints *constraints) {
|
||||
for (const auto &[label, property] : constraints->existence_constraints) {
|
||||
if (!vertex->deleted && utils::Contains(vertex->labels, label) &&
|
||||
vertex->properties.find(property) == vertex->properties.end()) {
|
||||
ValidateExistenceConstraints(const Vertex &vertex,
|
||||
const Constraints &constraints) {
|
||||
for (const auto &[label, property] : constraints.existence_constraints) {
|
||||
if (!vertex.deleted && utils::Contains(vertex.labels, label) &&
|
||||
vertex.properties.find(property) == vertex.properties.end()) {
|
||||
return ExistenceConstraintViolation{label, property};
|
||||
}
|
||||
}
|
||||
|
@ -1136,29 +1136,29 @@ WalFile::~WalFile() {
|
||||
}
|
||||
}
|
||||
|
||||
void WalFile::AppendDelta(const Delta &delta, Vertex *vertex,
|
||||
void WalFile::AppendDelta(const Delta &delta, const Vertex &vertex,
|
||||
uint64_t timestamp) {
|
||||
// When converting a Delta to a WAL delta the logic is inverted. That is
|
||||
// because the Delta's represent undo actions and we want to store redo
|
||||
// actions.
|
||||
wal_.WriteMarker(Marker::SECTION_DELTA);
|
||||
wal_.WriteUint(timestamp);
|
||||
std::lock_guard<utils::SpinLock> guard(vertex->lock);
|
||||
std::lock_guard<utils::SpinLock> guard(vertex.lock);
|
||||
switch (delta.action) {
|
||||
case Delta::Action::DELETE_OBJECT:
|
||||
case Delta::Action::RECREATE_OBJECT: {
|
||||
wal_.WriteMarker(VertexActionToMarker(delta.action));
|
||||
wal_.WriteUint(vertex->gid.AsUint());
|
||||
wal_.WriteUint(vertex.gid.AsUint());
|
||||
break;
|
||||
}
|
||||
case Delta::Action::SET_PROPERTY: {
|
||||
wal_.WriteMarker(Marker::DELTA_VERTEX_SET_PROPERTY);
|
||||
wal_.WriteUint(vertex->gid.AsUint());
|
||||
wal_.WriteUint(vertex.gid.AsUint());
|
||||
wal_.WriteString(name_id_mapper_->IdToName(delta.property.key.AsUint()));
|
||||
// The property value is the value that is currently stored in the
|
||||
// vertex.
|
||||
auto it = vertex->properties.find(delta.property.key);
|
||||
if (it != vertex->properties.end()) {
|
||||
auto it = vertex.properties.find(delta.property.key);
|
||||
if (it != vertex.properties.end()) {
|
||||
wal_.WritePropertyValue(it->second);
|
||||
} else {
|
||||
wal_.WritePropertyValue(PropertyValue());
|
||||
@ -1168,7 +1168,7 @@ void WalFile::AppendDelta(const Delta &delta, Vertex *vertex,
|
||||
case Delta::Action::ADD_LABEL:
|
||||
case Delta::Action::REMOVE_LABEL: {
|
||||
wal_.WriteMarker(VertexActionToMarker(delta.action));
|
||||
wal_.WriteUint(vertex->gid.AsUint());
|
||||
wal_.WriteUint(vertex.gid.AsUint());
|
||||
wal_.WriteString(name_id_mapper_->IdToName(delta.label.AsUint()));
|
||||
break;
|
||||
}
|
||||
@ -1182,7 +1182,7 @@ void WalFile::AppendDelta(const Delta &delta, Vertex *vertex,
|
||||
}
|
||||
wal_.WriteString(
|
||||
name_id_mapper_->IdToName(delta.vertex_edge.edge_type.AsUint()));
|
||||
wal_.WriteUint(vertex->gid.AsUint());
|
||||
wal_.WriteUint(vertex.gid.AsUint());
|
||||
wal_.WriteUint(delta.vertex_edge.vertex->gid.AsUint());
|
||||
break;
|
||||
}
|
||||
@ -1195,22 +1195,23 @@ void WalFile::AppendDelta(const Delta &delta, Vertex *vertex,
|
||||
UpdateStats(timestamp);
|
||||
}
|
||||
|
||||
void WalFile::AppendDelta(const Delta &delta, Edge *edge, uint64_t timestamp) {
|
||||
void WalFile::AppendDelta(const Delta &delta, const Edge &edge,
|
||||
uint64_t timestamp) {
|
||||
// When converting a Delta to a WAL delta the logic is inverted. That is
|
||||
// because the Delta's represent undo actions and we want to store redo
|
||||
// actions.
|
||||
wal_.WriteMarker(Marker::SECTION_DELTA);
|
||||
wal_.WriteUint(timestamp);
|
||||
std::lock_guard<utils::SpinLock> guard(edge->lock);
|
||||
std::lock_guard<utils::SpinLock> guard(edge.lock);
|
||||
switch (delta.action) {
|
||||
case Delta::Action::SET_PROPERTY: {
|
||||
wal_.WriteMarker(Marker::DELTA_EDGE_SET_PROPERTY);
|
||||
wal_.WriteUint(edge->gid.AsUint());
|
||||
wal_.WriteUint(edge.gid.AsUint());
|
||||
wal_.WriteString(name_id_mapper_->IdToName(delta.property.key.AsUint()));
|
||||
// The property value is the value that is currently stored in the
|
||||
// edge.
|
||||
auto it = edge->properties.find(delta.property.key);
|
||||
if (it != edge->properties.end()) {
|
||||
auto it = edge.properties.find(delta.property.key);
|
||||
if (it != edge.properties.end()) {
|
||||
wal_.WritePropertyValue(it->second);
|
||||
} else {
|
||||
wal_.WritePropertyValue(PropertyValue());
|
||||
@ -1381,7 +1382,7 @@ void Durability::AppendToWal(const Transaction &transaction,
|
||||
transaction.commit_timestamp->load(std::memory_order_acquire);
|
||||
// Helper lambda that traverses the delta chain on order to find the first
|
||||
// delta that should be processed and then appends all discovered deltas.
|
||||
auto find_and_apply_deltas = [&](const auto *delta, auto *parent,
|
||||
auto find_and_apply_deltas = [&](const auto *delta, const auto &parent,
|
||||
auto filter) {
|
||||
while (true) {
|
||||
auto older = delta->next.load(std::memory_order_acquire);
|
||||
@ -1419,7 +1420,7 @@ void Durability::AppendToWal(const Transaction &transaction,
|
||||
for (const auto &delta : transaction.deltas) {
|
||||
auto prev = delta.prev.Get();
|
||||
if (prev.type != PreviousPtr::Type::VERTEX) continue;
|
||||
find_and_apply_deltas(&delta, prev.vertex, [](auto action) {
|
||||
find_and_apply_deltas(&delta, *prev.vertex, [](auto action) {
|
||||
switch (action) {
|
||||
case Delta::Action::DELETE_OBJECT:
|
||||
case Delta::Action::SET_PROPERTY:
|
||||
@ -1440,7 +1441,7 @@ void Durability::AppendToWal(const Transaction &transaction,
|
||||
for (const auto &delta : transaction.deltas) {
|
||||
auto prev = delta.prev.Get();
|
||||
if (prev.type != PreviousPtr::Type::VERTEX) continue;
|
||||
find_and_apply_deltas(&delta, prev.vertex, [](auto action) {
|
||||
find_and_apply_deltas(&delta, *prev.vertex, [](auto action) {
|
||||
switch (action) {
|
||||
case Delta::Action::REMOVE_OUT_EDGE:
|
||||
return true;
|
||||
@ -1461,7 +1462,7 @@ void Durability::AppendToWal(const Transaction &transaction,
|
||||
for (const auto &delta : transaction.deltas) {
|
||||
auto prev = delta.prev.Get();
|
||||
if (prev.type != PreviousPtr::Type::EDGE) continue;
|
||||
find_and_apply_deltas(&delta, prev.edge, [](auto action) {
|
||||
find_and_apply_deltas(&delta, *prev.edge, [](auto action) {
|
||||
switch (action) {
|
||||
case Delta::Action::SET_PROPERTY:
|
||||
return true;
|
||||
@ -1482,7 +1483,7 @@ void Durability::AppendToWal(const Transaction &transaction,
|
||||
for (const auto &delta : transaction.deltas) {
|
||||
auto prev = delta.prev.Get();
|
||||
if (prev.type != PreviousPtr::Type::VERTEX) continue;
|
||||
find_and_apply_deltas(&delta, prev.vertex, [](auto action) {
|
||||
find_and_apply_deltas(&delta, *prev.vertex, [](auto action) {
|
||||
switch (action) {
|
||||
case Delta::Action::ADD_OUT_EDGE:
|
||||
return true;
|
||||
@ -1503,7 +1504,7 @@ void Durability::AppendToWal(const Transaction &transaction,
|
||||
for (const auto &delta : transaction.deltas) {
|
||||
auto prev = delta.prev.Get();
|
||||
if (prev.type != PreviousPtr::Type::VERTEX) continue;
|
||||
find_and_apply_deltas(&delta, prev.vertex, [](auto action) {
|
||||
find_and_apply_deltas(&delta, *prev.vertex, [](auto action) {
|
||||
switch (action) {
|
||||
case Delta::Action::RECREATE_OBJECT:
|
||||
return true;
|
||||
|
@ -324,8 +324,9 @@ class WalFile {
|
||||
|
||||
~WalFile();
|
||||
|
||||
void AppendDelta(const Delta &delta, Vertex *vertex, uint64_t timestamp);
|
||||
void AppendDelta(const Delta &delta, Edge *edge, uint64_t timestamp);
|
||||
void AppendDelta(const Delta &delta, const Vertex &vertex,
|
||||
uint64_t timestamp);
|
||||
void AppendDelta(const Delta &delta, const Edge &edge, uint64_t timestamp);
|
||||
|
||||
void AppendTransactionEnd(uint64_t timestamp);
|
||||
|
||||
|
@ -22,7 +22,7 @@ struct Edge {
|
||||
|
||||
std::map<PropertyId, storage::PropertyValue> properties;
|
||||
|
||||
utils::SpinLock lock;
|
||||
mutable utils::SpinLock lock;
|
||||
bool deleted;
|
||||
// uint8_t PAD;
|
||||
// uint16_t PAD;
|
||||
|
@ -11,7 +11,7 @@ namespace {
|
||||
/// delta. If the callback ever returns true, traversal is stopped and the
|
||||
/// function returns true. Otherwise, the function returns false.
|
||||
template <typename TCallback>
|
||||
bool AnyVersionSatisfiesPredicate(uint64_t timestamp, Delta *delta,
|
||||
bool AnyVersionSatisfiesPredicate(uint64_t timestamp, const Delta *delta,
|
||||
const TCallback &predicate) {
|
||||
while (delta != nullptr) {
|
||||
auto ts = delta->timestamp->load(std::memory_order_acquire);
|
||||
@ -30,15 +30,16 @@ bool AnyVersionSatisfiesPredicate(uint64_t timestamp, Delta *delta,
|
||||
|
||||
/// Helper function for label index garbage collection. Returns true if there's
|
||||
/// a reachable version of the vertex that has the given label.
|
||||
bool AnyVersionHasLabel(Vertex *vertex, LabelId label, uint64_t timestamp) {
|
||||
bool AnyVersionHasLabel(const Vertex &vertex, LabelId label,
|
||||
uint64_t timestamp) {
|
||||
bool has_label;
|
||||
bool deleted;
|
||||
Delta *delta;
|
||||
const Delta *delta;
|
||||
{
|
||||
std::lock_guard<utils::SpinLock> guard(vertex->lock);
|
||||
has_label = utils::Contains(vertex->labels, label);
|
||||
deleted = vertex->deleted;
|
||||
delta = vertex->delta;
|
||||
std::lock_guard<utils::SpinLock> guard(vertex.lock);
|
||||
has_label = utils::Contains(vertex.labels, label);
|
||||
deleted = vertex.deleted;
|
||||
delta = vertex.delta;
|
||||
}
|
||||
if (!deleted && has_label) {
|
||||
return true;
|
||||
@ -83,22 +84,22 @@ bool AnyVersionHasLabel(Vertex *vertex, LabelId label, uint64_t timestamp) {
|
||||
/// there's a reachable version of the vertex that has the given label and
|
||||
/// property value.
|
||||
/// @throw std::bad_alloc if unable to copy the PropertyValue
|
||||
bool AnyVersionHasLabelProperty(Vertex *vertex, LabelId label, PropertyId key,
|
||||
const PropertyValue &value,
|
||||
bool AnyVersionHasLabelProperty(const Vertex &vertex, LabelId label,
|
||||
PropertyId key, const PropertyValue &value,
|
||||
uint64_t timestamp) {
|
||||
bool has_label;
|
||||
PropertyValue current_value;
|
||||
bool deleted;
|
||||
Delta *delta;
|
||||
const Delta *delta;
|
||||
{
|
||||
std::lock_guard<utils::SpinLock> guard(vertex->lock);
|
||||
has_label = utils::Contains(vertex->labels, label);
|
||||
auto it = vertex->properties.find(key);
|
||||
if (it != vertex->properties.end()) {
|
||||
std::lock_guard<utils::SpinLock> guard(vertex.lock);
|
||||
has_label = utils::Contains(vertex.labels, label);
|
||||
auto it = vertex.properties.find(key);
|
||||
if (it != vertex.properties.end()) {
|
||||
current_value = it->second;
|
||||
}
|
||||
deleted = vertex->deleted;
|
||||
delta = vertex->delta;
|
||||
deleted = vertex.deleted;
|
||||
delta = vertex.delta;
|
||||
}
|
||||
|
||||
if (!deleted && has_label && current_value == value) {
|
||||
@ -150,16 +151,16 @@ bool AnyVersionHasLabelProperty(Vertex *vertex, LabelId label, PropertyId key,
|
||||
// Helper function for iterating through label index. Returns true if this
|
||||
// transaction can see the given vertex, and the visible version has the given
|
||||
// label.
|
||||
bool CurrentVersionHasLabel(Vertex *vertex, LabelId label,
|
||||
bool CurrentVersionHasLabel(const Vertex &vertex, LabelId label,
|
||||
Transaction *transaction, View view) {
|
||||
bool deleted;
|
||||
bool has_label;
|
||||
Delta *delta;
|
||||
const Delta *delta;
|
||||
{
|
||||
std::lock_guard<utils::SpinLock> guard(vertex->lock);
|
||||
deleted = vertex->deleted;
|
||||
has_label = utils::Contains(vertex->labels, label);
|
||||
delta = vertex->delta;
|
||||
std::lock_guard<utils::SpinLock> guard(vertex.lock);
|
||||
deleted = vertex.deleted;
|
||||
has_label = utils::Contains(vertex.labels, label);
|
||||
delta = vertex.delta;
|
||||
}
|
||||
ApplyDeltasForRead(transaction, delta, view,
|
||||
[&deleted, &has_label, label](const Delta &delta) {
|
||||
@ -203,22 +204,22 @@ bool CurrentVersionHasLabel(Vertex *vertex, LabelId label,
|
||||
// this transaction can see the given vertex, and the visible version has the
|
||||
// given label and property.
|
||||
// @throw std::bad_alloc if unable to copy the PropertyValue
|
||||
bool CurrentVersionHasLabelProperty(Vertex *vertex, LabelId label,
|
||||
bool CurrentVersionHasLabelProperty(const Vertex &vertex, LabelId label,
|
||||
PropertyId key, const PropertyValue &value,
|
||||
Transaction *transaction, View view) {
|
||||
bool deleted;
|
||||
bool has_label;
|
||||
PropertyValue current_value;
|
||||
Delta *delta;
|
||||
const Delta *delta;
|
||||
{
|
||||
std::lock_guard<utils::SpinLock> guard(vertex->lock);
|
||||
deleted = vertex->deleted;
|
||||
has_label = utils::Contains(vertex->labels, label);
|
||||
auto it = vertex->properties.find(key);
|
||||
if (it != vertex->properties.end()) {
|
||||
std::lock_guard<utils::SpinLock> guard(vertex.lock);
|
||||
deleted = vertex.deleted;
|
||||
has_label = utils::Contains(vertex.labels, label);
|
||||
auto it = vertex.properties.find(key);
|
||||
if (it != vertex.properties.end()) {
|
||||
current_value = it->second;
|
||||
}
|
||||
delta = vertex->delta;
|
||||
delta = vertex.delta;
|
||||
}
|
||||
ApplyDeltasForRead(
|
||||
transaction, delta, view,
|
||||
@ -313,7 +314,7 @@ void LabelIndex::RemoveObsoleteEntries(uint64_t oldest_active_start_timestamp) {
|
||||
}
|
||||
|
||||
if ((next_it != vertices_acc.end() && it->vertex == next_it->vertex) ||
|
||||
!AnyVersionHasLabel(it->vertex, label_storage.first,
|
||||
!AnyVersionHasLabel(*it->vertex, label_storage.first,
|
||||
oldest_active_start_timestamp)) {
|
||||
vertices_acc.remove(*it);
|
||||
}
|
||||
@ -343,7 +344,7 @@ void LabelIndex::Iterable::Iterator::AdvanceUntilValid() {
|
||||
if (index_iterator_->vertex == current_vertex_) {
|
||||
continue;
|
||||
}
|
||||
if (CurrentVersionHasLabel(index_iterator_->vertex, self_->label_,
|
||||
if (CurrentVersionHasLabel(*index_iterator_->vertex, self_->label_,
|
||||
self_->transaction_, self_->view_)) {
|
||||
current_vertex_ = index_iterator_->vertex;
|
||||
current_vertex_accessor_ =
|
||||
@ -470,7 +471,7 @@ void LabelPropertyIndex::RemoveObsoleteEntries(
|
||||
|
||||
if ((next_it != index_acc.end() && it->vertex == next_it->vertex &&
|
||||
it->value == next_it->value) ||
|
||||
!AnyVersionHasLabelProperty(it->vertex, label_property.first,
|
||||
!AnyVersionHasLabelProperty(*it->vertex, label_property.first,
|
||||
label_property.second, it->value,
|
||||
oldest_active_start_timestamp)) {
|
||||
index_acc.remove(*it);
|
||||
@ -523,7 +524,7 @@ void LabelPropertyIndex::Iterable::Iterator::AdvanceUntilValid() {
|
||||
}
|
||||
}
|
||||
|
||||
if (CurrentVersionHasLabelProperty(index_iterator_->vertex, self_->label_,
|
||||
if (CurrentVersionHasLabelProperty(*index_iterator_->vertex, self_->label_,
|
||||
self_->property_, index_iterator_->value,
|
||||
self_->transaction_, self_->view_)) {
|
||||
current_vertex_ = index_iterator_->vertex;
|
||||
|
@ -14,7 +14,7 @@ namespace storage {
|
||||
/// should be applied passed as a parameter to the callback. It is up to the
|
||||
/// caller to apply the deltas.
|
||||
template <typename TCallback>
|
||||
inline void ApplyDeltasForRead(Transaction *transaction, Delta *delta,
|
||||
inline void ApplyDeltasForRead(Transaction *transaction, const Delta *delta,
|
||||
View view, const TCallback &callback) {
|
||||
while (delta != nullptr) {
|
||||
auto ts = delta->timestamp->load(std::memory_order_acquire);
|
||||
|
@ -655,7 +655,7 @@ Storage::Accessor::Commit() {
|
||||
// No need to take any locks here because we modified this vertex and no
|
||||
// one else can touch it until we commit.
|
||||
auto validation_result =
|
||||
ValidateExistenceConstraints(prev.vertex, &storage_->constraints_);
|
||||
ValidateExistenceConstraints(*prev.vertex, storage_->constraints_);
|
||||
if (validation_result) {
|
||||
Abort();
|
||||
return *validation_result;
|
||||
|
@ -27,7 +27,7 @@ struct Vertex {
|
||||
std::vector<std::tuple<EdgeTypeId, Vertex *, EdgeRef>> in_edges;
|
||||
std::vector<std::tuple<EdgeTypeId, Vertex *, EdgeRef>> out_edges;
|
||||
|
||||
utils::SpinLock lock;
|
||||
mutable utils::SpinLock lock;
|
||||
bool deleted;
|
||||
// uint8_t PAD;
|
||||
// uint16_t PAD;
|
||||
|
@ -140,9 +140,9 @@ class DeltaGenerator final {
|
||||
owner = owner.delta->prev.Get();
|
||||
}
|
||||
if (owner.type == storage::PreviousPtr::Type::VERTEX) {
|
||||
gen_->wal_file_.AppendDelta(delta, owner.vertex, commit_timestamp);
|
||||
gen_->wal_file_.AppendDelta(delta, *owner.vertex, commit_timestamp);
|
||||
} else if (owner.type == storage::PreviousPtr::Type::EDGE) {
|
||||
gen_->wal_file_.AppendDelta(delta, owner.edge, commit_timestamp);
|
||||
gen_->wal_file_.AppendDelta(delta, *owner.edge, commit_timestamp);
|
||||
} else {
|
||||
LOG(FATAL) << "Invalid delta owner!";
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user