Introduce a reader writer spin lock (#1187)
It is possible for multiple read only queries to be accessing the same sequence of vertices/edges. The reader mode of the spin lock will ensure multiple threads can make progress at the same time.
This commit is contained in:
parent
e928eed028
commit
9661c52179
@ -151,8 +151,8 @@ storage::Result<communication::bolt::Vertex> ToBoltVertex(const storage::VertexA
|
||||
properties[db.PropertyToName(prop.first)] = ToBoltValue(prop.second);
|
||||
}
|
||||
// Introduced in Bolt v5 (for now just send the ID)
|
||||
const auto element_id = std::to_string(id.AsInt());
|
||||
return communication::bolt::Vertex{id, labels, properties, element_id};
|
||||
auto element_id = std::to_string(id.AsInt());
|
||||
return communication::bolt::Vertex{id, std::move(labels), std::move(properties), std::move(element_id)};
|
||||
}
|
||||
|
||||
storage::Result<communication::bolt::Edge> ToBoltEdge(const storage::EdgeAccessor &edge, const storage::Storage &db,
|
||||
@ -171,7 +171,8 @@ storage::Result<communication::bolt::Edge> ToBoltEdge(const storage::EdgeAccesso
|
||||
const auto element_id = std::to_string(id.AsInt());
|
||||
const auto from_element_id = std::to_string(from.AsInt());
|
||||
const auto to_element_id = std::to_string(to.AsInt());
|
||||
return communication::bolt::Edge{id, from, to, type, properties, element_id, from_element_id, to_element_id};
|
||||
return communication::bolt::Edge{
|
||||
id, from, to, std::move(type), std::move(properties), element_id, from_element_id, to_element_id};
|
||||
}
|
||||
|
||||
storage::Result<communication::bolt::Path> ToBoltPath(const query::Path &path, const storage::Storage &db,
|
||||
|
@ -1377,7 +1377,7 @@ void CreateSnapshot(Transaction *transaction, const std::filesystem::path &snaps
|
||||
bool is_visible = true;
|
||||
Delta *delta = nullptr;
|
||||
{
|
||||
std::lock_guard<utils::SpinLock> guard(edge.lock);
|
||||
auto guard = std::shared_lock{edge.lock};
|
||||
is_visible = !edge.deleted;
|
||||
delta = edge.delta;
|
||||
}
|
||||
|
@ -490,7 +490,7 @@ void EncodeDelta(BaseEncoder *encoder, NameIdMapper *name_id_mapper, Config::Ite
|
||||
// actions.
|
||||
encoder->WriteMarker(Marker::SECTION_DELTA);
|
||||
encoder->WriteUint(timestamp);
|
||||
std::lock_guard<utils::SpinLock> guard(vertex.lock);
|
||||
auto guard = std::shared_lock{vertex.lock};
|
||||
switch (delta.action) {
|
||||
case Delta::Action::DELETE_DESERIALIZED_OBJECT:
|
||||
case Delta::Action::DELETE_OBJECT:
|
||||
@ -546,7 +546,7 @@ void EncodeDelta(BaseEncoder *encoder, NameIdMapper *name_id_mapper, const Delta
|
||||
// actions.
|
||||
encoder->WriteMarker(Marker::SECTION_DELTA);
|
||||
encoder->WriteUint(timestamp);
|
||||
std::lock_guard<utils::SpinLock> guard(edge.lock);
|
||||
auto guard = std::shared_lock{edge.lock};
|
||||
switch (delta.action) {
|
||||
case Delta::Action::SET_PROPERTY: {
|
||||
encoder->WriteMarker(Marker::DELTA_EDGE_SET_PROPERTY);
|
||||
|
@ -17,7 +17,7 @@
|
||||
#include "storage/v2/id_types.hpp"
|
||||
#include "storage/v2/property_store.hpp"
|
||||
#include "utils/logging.hpp"
|
||||
#include "utils/spin_lock.hpp"
|
||||
#include "utils/rw_spin_lock.hpp"
|
||||
|
||||
namespace memgraph::storage {
|
||||
|
||||
@ -34,7 +34,7 @@ struct Edge {
|
||||
|
||||
PropertyStore properties;
|
||||
|
||||
mutable utils::SpinLock lock;
|
||||
mutable utils::RWSpinLock lock;
|
||||
bool deleted;
|
||||
// uint8_t PAD;
|
||||
// uint16_t PAD;
|
||||
|
@ -32,7 +32,7 @@ bool EdgeAccessor::IsVisible(const View view) const {
|
||||
if (!config_.properties_on_edges) {
|
||||
Delta *delta = nullptr;
|
||||
{
|
||||
std::lock_guard<utils::SpinLock> guard(from_vertex_->lock);
|
||||
auto guard = std::shared_lock{from_vertex_->lock};
|
||||
// Initialize deleted by checking if out edges contain edge_
|
||||
deleted = std::find_if(from_vertex_->out_edges.begin(), from_vertex_->out_edges.end(), [&](const auto &out_edge) {
|
||||
return std::get<2>(out_edge) == edge_;
|
||||
@ -69,7 +69,7 @@ bool EdgeAccessor::IsVisible(const View view) const {
|
||||
|
||||
Delta *delta = nullptr;
|
||||
{
|
||||
std::lock_guard<utils::SpinLock> guard(edge_.ptr->lock);
|
||||
auto guard = std::shared_lock{edge_.ptr->lock};
|
||||
deleted = edge_.ptr->deleted;
|
||||
delta = edge_.ptr->delta;
|
||||
}
|
||||
@ -109,7 +109,7 @@ Result<storage::PropertyValue> EdgeAccessor::SetProperty(PropertyId property, co
|
||||
utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception;
|
||||
if (!config_.properties_on_edges) return Error::PROPERTIES_DISABLED;
|
||||
|
||||
std::lock_guard<utils::SpinLock> guard(edge_.ptr->lock);
|
||||
auto guard = std::unique_lock{edge_.ptr->lock};
|
||||
|
||||
if (!PrepareForWrite(transaction_, edge_.ptr)) return Error::SERIALIZATION_ERROR;
|
||||
|
||||
@ -133,7 +133,7 @@ Result<bool> EdgeAccessor::InitProperties(const std::map<storage::PropertyId, st
|
||||
utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception;
|
||||
if (!config_.properties_on_edges) return Error::PROPERTIES_DISABLED;
|
||||
|
||||
std::lock_guard<utils::SpinLock> guard(edge_.ptr->lock);
|
||||
auto guard = std::unique_lock{edge_.ptr->lock};
|
||||
|
||||
if (!PrepareForWrite(transaction_, edge_.ptr)) return Error::SERIALIZATION_ERROR;
|
||||
|
||||
@ -152,7 +152,7 @@ Result<std::vector<std::tuple<PropertyId, PropertyValue, PropertyValue>>> EdgeAc
|
||||
utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception;
|
||||
if (!config_.properties_on_edges) return Error::PROPERTIES_DISABLED;
|
||||
|
||||
std::lock_guard<utils::SpinLock> guard(edge_.ptr->lock);
|
||||
auto guard = std::unique_lock{edge_.ptr->lock};
|
||||
|
||||
if (!PrepareForWrite(transaction_, edge_.ptr)) return Error::SERIALIZATION_ERROR;
|
||||
|
||||
@ -170,7 +170,7 @@ Result<std::vector<std::tuple<PropertyId, PropertyValue, PropertyValue>>> EdgeAc
|
||||
Result<std::map<PropertyId, PropertyValue>> EdgeAccessor::ClearProperties() {
|
||||
if (!config_.properties_on_edges) return Error::PROPERTIES_DISABLED;
|
||||
|
||||
std::lock_guard<utils::SpinLock> guard(edge_.ptr->lock);
|
||||
auto guard = std::unique_lock{edge_.ptr->lock};
|
||||
|
||||
if (!PrepareForWrite(transaction_, edge_.ptr)) return Error::SERIALIZATION_ERROR;
|
||||
|
||||
@ -193,7 +193,7 @@ Result<PropertyValue> EdgeAccessor::GetProperty(PropertyId property, View view)
|
||||
PropertyValue value;
|
||||
Delta *delta = nullptr;
|
||||
{
|
||||
std::lock_guard<utils::SpinLock> guard(edge_.ptr->lock);
|
||||
auto guard = std::shared_lock{edge_.ptr->lock};
|
||||
deleted = edge_.ptr->deleted;
|
||||
value = edge_.ptr->properties.GetProperty(property);
|
||||
delta = edge_.ptr->delta;
|
||||
@ -236,7 +236,7 @@ Result<std::map<PropertyId, PropertyValue>> EdgeAccessor::Properties(View view)
|
||||
std::map<PropertyId, PropertyValue> properties;
|
||||
Delta *delta = nullptr;
|
||||
{
|
||||
std::lock_guard<utils::SpinLock> guard(edge_.ptr->lock);
|
||||
auto guard = std::shared_lock{edge_.ptr->lock};
|
||||
deleted = edge_.ptr->deleted;
|
||||
properties = edge_.ptr->properties.Properties();
|
||||
delta = edge_.ptr->delta;
|
||||
|
@ -51,7 +51,7 @@ inline bool AnyVersionHasLabel(const Vertex &vertex, LabelId label, uint64_t tim
|
||||
bool deleted{false};
|
||||
const Delta *delta = nullptr;
|
||||
{
|
||||
std::lock_guard<utils::SpinLock> guard(vertex.lock);
|
||||
auto guard = std::shared_lock{vertex.lock};
|
||||
has_label = utils::Contains(vertex.labels, label);
|
||||
deleted = vertex.deleted;
|
||||
delta = vertex.delta;
|
||||
@ -105,7 +105,7 @@ inline bool AnyVersionHasLabelProperty(const Vertex &vertex, LabelId label, Prop
|
||||
bool deleted{false};
|
||||
const Delta *delta = nullptr;
|
||||
{
|
||||
std::lock_guard<utils::SpinLock> guard(vertex.lock);
|
||||
auto guard = std::shared_lock{vertex.lock};
|
||||
has_label = utils::Contains(vertex.labels, label);
|
||||
current_value_equal_to_value = vertex.properties.IsPropertyEqual(key, value);
|
||||
deleted = vertex.deleted;
|
||||
@ -168,7 +168,7 @@ inline bool CurrentVersionHasLabelProperty(const Vertex &vertex, LabelId label,
|
||||
bool current_value_equal_to_value = value.IsNull();
|
||||
const Delta *delta = nullptr;
|
||||
{
|
||||
std::lock_guard<utils::SpinLock> guard(vertex.lock);
|
||||
auto guard = std::shared_lock{vertex.lock};
|
||||
deleted = vertex.deleted;
|
||||
has_label = utils::Contains(vertex.labels, label);
|
||||
current_value_equal_to_value = vertex.properties.IsPropertyEqual(key, value);
|
||||
|
@ -131,7 +131,7 @@ void InMemoryReplicationServer::SnapshotHandler(slk::Reader *req_reader, slk::Bu
|
||||
MG_ASSERT(maybe_snapshot_path, "Failed to load snapshot!");
|
||||
spdlog::info("Received snapshot saved to {}", *maybe_snapshot_path);
|
||||
|
||||
std::unique_lock<utils::RWLock> storage_guard(storage_->main_lock_);
|
||||
auto storage_guard = std::unique_lock{storage_->main_lock_};
|
||||
spdlog::trace("Clearing database since recovering from snapshot.");
|
||||
// Clear the database
|
||||
storage_->vertices_.clear();
|
||||
@ -416,7 +416,7 @@ uint64_t InMemoryReplicationServer::ReadAndApplyDelta(InMemoryStorage *storage,
|
||||
bool is_visible = true;
|
||||
Delta *delta = nullptr;
|
||||
{
|
||||
std::lock_guard<utils::SpinLock> guard(edge->lock);
|
||||
auto guard = std::shared_lock{edge->lock};
|
||||
is_visible = !edge->deleted;
|
||||
delta = edge->delta;
|
||||
}
|
||||
|
@ -248,7 +248,7 @@ Result<std::optional<VertexAccessor>> InMemoryStorage::InMemoryAccessor::DeleteV
|
||||
"accessor when deleting a vertex!");
|
||||
auto *vertex_ptr = vertex->vertex_;
|
||||
|
||||
std::lock_guard<utils::SpinLock> guard(vertex_ptr->lock);
|
||||
auto guard = std::unique_lock{vertex_ptr->lock};
|
||||
|
||||
if (!PrepareForWrite(&transaction_, vertex_ptr)) return Error::SERIALIZATION_ERROR;
|
||||
|
||||
@ -286,7 +286,7 @@ InMemoryStorage::InMemoryAccessor::DetachDeleteVertex(VertexAccessor *vertex) {
|
||||
std::vector<std::tuple<EdgeTypeId, Vertex *, EdgeRef>> out_edges;
|
||||
|
||||
{
|
||||
std::lock_guard<utils::SpinLock> guard(vertex_ptr->lock);
|
||||
auto guard = std::unique_lock{vertex_ptr->lock};
|
||||
|
||||
if (!PrepareForWrite(&transaction_, vertex_ptr)) return Error::SERIALIZATION_ERROR;
|
||||
|
||||
@ -326,7 +326,7 @@ InMemoryStorage::InMemoryAccessor::DetachDeleteVertex(VertexAccessor *vertex) {
|
||||
}
|
||||
}
|
||||
|
||||
std::lock_guard<utils::SpinLock> guard(vertex_ptr->lock);
|
||||
auto guard = std::unique_lock{vertex_ptr->lock};
|
||||
|
||||
// We need to check again for serialization errors because we unlocked the
|
||||
// vertex. Some other transaction could have modified the vertex in the
|
||||
@ -366,8 +366,8 @@ Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::CreateEdge(VertexAccesso
|
||||
auto *to_vertex = to->vertex_;
|
||||
|
||||
// Obtain the locks by `gid` order to avoid lock cycles.
|
||||
std::unique_lock<utils::SpinLock> guard_from(from_vertex->lock, std::defer_lock);
|
||||
std::unique_lock<utils::SpinLock> guard_to(to_vertex->lock, std::defer_lock);
|
||||
auto guard_from = std::unique_lock{from_vertex->lock, std::defer_lock};
|
||||
auto guard_to = std::unique_lock{to_vertex->lock, std::defer_lock};
|
||||
if (from_vertex->gid < to_vertex->gid) {
|
||||
guard_from.lock();
|
||||
guard_to.lock();
|
||||
@ -432,8 +432,8 @@ Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::CreateEdgeEx(VertexAcces
|
||||
auto *to_vertex = to->vertex_;
|
||||
|
||||
// Obtain the locks by `gid` order to avoid lock cycles.
|
||||
std::unique_lock<utils::SpinLock> guard_from(from_vertex->lock, std::defer_lock);
|
||||
std::unique_lock<utils::SpinLock> guard_to(to_vertex->lock, std::defer_lock);
|
||||
auto guard_from = std::unique_lock{from_vertex->lock, std::defer_lock};
|
||||
auto guard_to = std::unique_lock{to_vertex->lock, std::defer_lock};
|
||||
if (from_vertex->gid < to_vertex->gid) {
|
||||
guard_from.lock();
|
||||
guard_to.lock();
|
||||
@ -500,10 +500,10 @@ Result<std::optional<EdgeAccessor>> InMemoryStorage::InMemoryAccessor::DeleteEdg
|
||||
auto edge_ref = edge->edge_;
|
||||
auto edge_type = edge->edge_type_;
|
||||
|
||||
std::unique_lock<utils::SpinLock> guard;
|
||||
std::unique_lock<utils::RWSpinLock> guard;
|
||||
if (config_.properties_on_edges) {
|
||||
auto *edge_ptr = edge_ref.ptr;
|
||||
guard = std::unique_lock<utils::SpinLock>(edge_ptr->lock);
|
||||
guard = std::unique_lock{edge_ptr->lock};
|
||||
|
||||
if (!PrepareForWrite(&transaction_, edge_ptr)) return Error::SERIALIZATION_ERROR;
|
||||
|
||||
@ -514,8 +514,8 @@ Result<std::optional<EdgeAccessor>> InMemoryStorage::InMemoryAccessor::DeleteEdg
|
||||
auto *to_vertex = edge->to_vertex_;
|
||||
|
||||
// Obtain the locks by `gid` order to avoid lock cycles.
|
||||
std::unique_lock<utils::SpinLock> guard_from(from_vertex->lock, std::defer_lock);
|
||||
std::unique_lock<utils::SpinLock> guard_to(to_vertex->lock, std::defer_lock);
|
||||
auto guard_from = std::unique_lock{from_vertex->lock, std::defer_lock};
|
||||
auto guard_to = std::unique_lock{to_vertex->lock, std::defer_lock};
|
||||
if (from_vertex->gid < to_vertex->gid) {
|
||||
guard_from.lock();
|
||||
guard_to.lock();
|
||||
@ -731,7 +731,7 @@ void InMemoryStorage::InMemoryAccessor::Abort() {
|
||||
switch (prev.type) {
|
||||
case PreviousPtr::Type::VERTEX: {
|
||||
auto *vertex = prev.vertex;
|
||||
std::lock_guard<utils::SpinLock> guard(vertex->lock);
|
||||
auto guard = std::unique_lock{vertex->lock};
|
||||
Delta *current = vertex->delta;
|
||||
while (current != nullptr && current->timestamp->load(std::memory_order_acquire) ==
|
||||
transaction_.transaction_id.load(std::memory_order_acquire)) {
|
||||
@ -819,7 +819,7 @@ void InMemoryStorage::InMemoryAccessor::Abort() {
|
||||
}
|
||||
case PreviousPtr::Type::EDGE: {
|
||||
auto *edge = prev.edge;
|
||||
std::lock_guard<utils::SpinLock> guard(edge->lock);
|
||||
auto guard = std::lock_guard{edge->lock};
|
||||
Delta *current = edge->delta;
|
||||
while (current != nullptr && current->timestamp->load(std::memory_order_acquire) ==
|
||||
transaction_.transaction_id.load(std::memory_order_acquire)) {
|
||||
@ -1264,7 +1264,7 @@ void InMemoryStorage::CollectGarbage(std::unique_lock<utils::RWLock> main_guard)
|
||||
switch (prev.type) {
|
||||
case PreviousPtr::Type::VERTEX: {
|
||||
Vertex *vertex = prev.vertex;
|
||||
std::lock_guard<utils::SpinLock> vertex_guard(vertex->lock);
|
||||
auto vertex_guard = std::unique_lock{vertex->lock};
|
||||
if (vertex->delta != &delta) {
|
||||
// Something changed, we're not the first delta in the chain
|
||||
// anymore.
|
||||
@ -1278,7 +1278,7 @@ void InMemoryStorage::CollectGarbage(std::unique_lock<utils::RWLock> main_guard)
|
||||
}
|
||||
case PreviousPtr::Type::EDGE: {
|
||||
Edge *edge = prev.edge;
|
||||
std::lock_guard<utils::SpinLock> edge_guard(edge->lock);
|
||||
auto edge_guard = std::unique_lock{edge->lock};
|
||||
if (edge->delta != &delta) {
|
||||
// Something changed, we're not the first delta in the chain
|
||||
// anymore.
|
||||
@ -1297,7 +1297,7 @@ void InMemoryStorage::CollectGarbage(std::unique_lock<utils::RWLock> main_guard)
|
||||
// part of the suffix later.
|
||||
break;
|
||||
}
|
||||
std::unique_lock<utils::SpinLock> guard;
|
||||
std::unique_lock<utils::RWSpinLock> guard;
|
||||
{
|
||||
// We need to find the parent object in order to be able to use
|
||||
// its lock.
|
||||
@ -1307,10 +1307,10 @@ void InMemoryStorage::CollectGarbage(std::unique_lock<utils::RWLock> main_guard)
|
||||
}
|
||||
switch (parent.type) {
|
||||
case PreviousPtr::Type::VERTEX:
|
||||
guard = std::unique_lock<utils::SpinLock>(parent.vertex->lock);
|
||||
guard = std::unique_lock{parent.vertex->lock};
|
||||
break;
|
||||
case PreviousPtr::Type::EDGE:
|
||||
guard = std::unique_lock<utils::SpinLock>(parent.edge->lock);
|
||||
guard = std::unique_lock{parent.edge->lock};
|
||||
break;
|
||||
case PreviousPtr::Type::DELTA:
|
||||
case PreviousPtr::Type::NULLPTR:
|
||||
|
@ -50,7 +50,7 @@ bool LastCommittedVersionHasLabelProperty(const Vertex &vertex, LabelId label, c
|
||||
bool deleted;
|
||||
bool has_label;
|
||||
{
|
||||
std::lock_guard<utils::SpinLock> guard(vertex.lock);
|
||||
auto guard = std::shared_lock{vertex.lock};
|
||||
delta = vertex.delta;
|
||||
deleted = vertex.deleted;
|
||||
has_label = utils::Contains(vertex.labels, label);
|
||||
@ -136,7 +136,7 @@ bool AnyVersionHasLabelProperty(const Vertex &vertex, LabelId label, const std::
|
||||
bool deleted;
|
||||
Delta *delta;
|
||||
{
|
||||
std::lock_guard<utils::SpinLock> guard(vertex.lock);
|
||||
auto guard = std::shared_lock{vertex.lock};
|
||||
has_label = utils::Contains(vertex.labels, label);
|
||||
deleted = vertex.deleted;
|
||||
delta = vertex.delta;
|
||||
|
@ -19,7 +19,7 @@
|
||||
#include "storage/v2/edge_ref.hpp"
|
||||
#include "storage/v2/id_types.hpp"
|
||||
#include "storage/v2/property_store.hpp"
|
||||
#include "utils/spin_lock.hpp"
|
||||
#include "utils/rw_spin_lock.hpp"
|
||||
|
||||
namespace memgraph::storage {
|
||||
|
||||
@ -38,7 +38,7 @@ struct Vertex {
|
||||
std::vector<std::tuple<EdgeTypeId, Vertex *, EdgeRef>> in_edges;
|
||||
std::vector<std::tuple<EdgeTypeId, Vertex *, EdgeRef>> out_edges;
|
||||
|
||||
mutable utils::SpinLock lock;
|
||||
mutable utils::RWSpinLock lock;
|
||||
bool deleted;
|
||||
// uint8_t PAD;
|
||||
// uint16_t PAD;
|
||||
|
@ -35,7 +35,7 @@ std::pair<bool, bool> IsVisible(Vertex const *vertex, Transaction const *transac
|
||||
bool deleted = false;
|
||||
Delta *delta = nullptr;
|
||||
{
|
||||
std::lock_guard<utils::SpinLock> guard(vertex->lock);
|
||||
auto guard = std::shared_lock{vertex->lock};
|
||||
deleted = vertex->deleted;
|
||||
delta = vertex->delta;
|
||||
}
|
||||
@ -90,7 +90,7 @@ bool VertexAccessor::IsVisible(View view) const {
|
||||
|
||||
Result<bool> VertexAccessor::AddLabel(LabelId label) {
|
||||
utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception;
|
||||
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
|
||||
auto guard = std::unique_lock{vertex_->lock};
|
||||
|
||||
if (!PrepareForWrite(transaction_, vertex_)) return Error::SERIALIZATION_ERROR;
|
||||
if (vertex_->deleted) return Error::DELETED_OBJECT;
|
||||
@ -109,7 +109,7 @@ Result<bool> VertexAccessor::AddLabel(LabelId label) {
|
||||
|
||||
/// TODO: move to after update and change naming to vertex after update
|
||||
Result<bool> VertexAccessor::RemoveLabel(LabelId label) {
|
||||
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
|
||||
auto guard = std::unique_lock{vertex_->lock};
|
||||
|
||||
if (!PrepareForWrite(transaction_, vertex_)) return Error::SERIALIZATION_ERROR;
|
||||
if (vertex_->deleted) return Error::DELETED_OBJECT;
|
||||
@ -135,7 +135,7 @@ Result<bool> VertexAccessor::HasLabel(LabelId label, View view) const {
|
||||
bool has_label = false;
|
||||
Delta *delta = nullptr;
|
||||
{
|
||||
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
|
||||
auto guard = std::shared_lock{vertex_->lock};
|
||||
deleted = vertex_->deleted;
|
||||
has_label = std::find(vertex_->labels.begin(), vertex_->labels.end(), label) != vertex_->labels.end();
|
||||
delta = vertex_->delta;
|
||||
@ -182,7 +182,7 @@ Result<std::vector<LabelId>> VertexAccessor::Labels(View view) const {
|
||||
std::vector<LabelId> labels;
|
||||
Delta *delta = nullptr;
|
||||
{
|
||||
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
|
||||
auto guard = std::shared_lock{vertex_->lock};
|
||||
deleted = vertex_->deleted;
|
||||
labels = vertex_->labels;
|
||||
delta = vertex_->delta;
|
||||
@ -225,7 +225,7 @@ Result<std::vector<LabelId>> VertexAccessor::Labels(View view) const {
|
||||
|
||||
Result<PropertyValue> VertexAccessor::SetProperty(PropertyId property, const PropertyValue &value) {
|
||||
utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception;
|
||||
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
|
||||
auto guard = std::unique_lock{vertex_->lock};
|
||||
|
||||
if (!PrepareForWrite(transaction_, vertex_)) return Error::SERIALIZATION_ERROR;
|
||||
|
||||
@ -250,7 +250,7 @@ Result<PropertyValue> VertexAccessor::SetProperty(PropertyId property, const Pro
|
||||
|
||||
Result<bool> VertexAccessor::InitProperties(const std::map<storage::PropertyId, storage::PropertyValue> &properties) {
|
||||
utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception;
|
||||
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
|
||||
auto guard = std::unique_lock{vertex_->lock};
|
||||
|
||||
if (!PrepareForWrite(transaction_, vertex_)) return Error::SERIALIZATION_ERROR;
|
||||
|
||||
@ -269,7 +269,7 @@ Result<bool> VertexAccessor::InitProperties(const std::map<storage::PropertyId,
|
||||
Result<std::vector<std::tuple<PropertyId, PropertyValue, PropertyValue>>> VertexAccessor::UpdateProperties(
|
||||
std::map<storage::PropertyId, storage::PropertyValue> &properties) const {
|
||||
utils::MemoryTracker::OutOfMemoryExceptionEnabler oom_exception;
|
||||
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
|
||||
auto guard = std::unique_lock{vertex_->lock};
|
||||
|
||||
if (!PrepareForWrite(transaction_, vertex_)) return Error::SERIALIZATION_ERROR;
|
||||
|
||||
@ -287,7 +287,7 @@ Result<std::vector<std::tuple<PropertyId, PropertyValue, PropertyValue>>> Vertex
|
||||
}
|
||||
|
||||
Result<std::map<PropertyId, PropertyValue>> VertexAccessor::ClearProperties() {
|
||||
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
|
||||
auto guard = std::unique_lock{vertex_->lock};
|
||||
|
||||
if (!PrepareForWrite(transaction_, vertex_)) return Error::SERIALIZATION_ERROR;
|
||||
|
||||
@ -311,7 +311,7 @@ Result<PropertyValue> VertexAccessor::GetProperty(PropertyId property, View view
|
||||
PropertyValue value;
|
||||
Delta *delta = nullptr;
|
||||
{
|
||||
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
|
||||
auto guard = std::shared_lock{vertex_->lock};
|
||||
deleted = vertex_->deleted;
|
||||
value = vertex_->properties.GetProperty(property);
|
||||
delta = vertex_->delta;
|
||||
@ -359,7 +359,7 @@ Result<std::map<PropertyId, PropertyValue>> VertexAccessor::Properties(View view
|
||||
std::map<PropertyId, PropertyValue> properties;
|
||||
Delta *delta = nullptr;
|
||||
{
|
||||
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
|
||||
auto guard = std::shared_lock{vertex_->lock};
|
||||
deleted = vertex_->deleted;
|
||||
properties = vertex_->properties.Properties();
|
||||
delta = vertex_->delta;
|
||||
@ -424,7 +424,7 @@ Result<std::vector<EdgeAccessor>> VertexAccessor::InEdges(View view, const std::
|
||||
auto in_edges = edge_store{};
|
||||
Delta *delta = nullptr;
|
||||
{
|
||||
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
|
||||
auto guard = std::shared_lock{vertex_->lock};
|
||||
deleted = vertex_->deleted;
|
||||
// TODO: a better filter copy
|
||||
if (edge_types.empty() && !destination) {
|
||||
@ -500,7 +500,7 @@ Result<std::vector<EdgeAccessor>> VertexAccessor::OutEdges(View view, const std:
|
||||
auto out_edges = edge_store{};
|
||||
Delta *delta = nullptr;
|
||||
{
|
||||
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
|
||||
auto guard = std::shared_lock{vertex_->lock};
|
||||
deleted = vertex_->deleted;
|
||||
if (edge_types.empty() && !destination) {
|
||||
out_edges = vertex_->out_edges;
|
||||
@ -558,7 +558,7 @@ Result<size_t> VertexAccessor::InDegree(View view) const {
|
||||
size_t degree = 0;
|
||||
Delta *delta = nullptr;
|
||||
{
|
||||
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
|
||||
auto guard = std::shared_lock{vertex_->lock};
|
||||
deleted = vertex_->deleted;
|
||||
degree = vertex_->in_edges.size();
|
||||
delta = vertex_->delta;
|
||||
@ -606,7 +606,7 @@ Result<size_t> VertexAccessor::OutDegree(View view) const {
|
||||
size_t degree = 0;
|
||||
Delta *delta = nullptr;
|
||||
{
|
||||
std::lock_guard<utils::SpinLock> guard(vertex_->lock);
|
||||
auto guard = std::shared_lock{vertex_->lock};
|
||||
deleted = vertex_->deleted;
|
||||
degree = vertex_->out_edges.size();
|
||||
delta = vertex_->delta;
|
||||
|
124
src/utils/rw_spin_lock.hpp
Normal file
124
src/utils/rw_spin_lock.hpp
Normal file
@ -0,0 +1,124 @@
|
||||
// Copyright 2023 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
// License, and you may not use this file except in compliance with the Business Source License.
|
||||
//
|
||||
// As of the Change Date specified in that file, in accordance with
|
||||
// the Business Source License, use of this software will be governed
|
||||
// by the Apache License, Version 2.0, included in the file
|
||||
// licenses/APL.txt.
|
||||
#pragma once
|
||||
|
||||
#include <time.h>
|
||||
#include <algorithm>
|
||||
#include <atomic>
|
||||
#include <cstdint>
|
||||
|
||||
namespace memgraph::utils {
|
||||
namespace {
|
||||
/// A helper for RWSpinLock, allows a contended spin lock to yield to another thread.
|
||||
struct yeilder {
|
||||
void operator()() noexcept {
|
||||
#if defined(__i386__) || defined(__x86_64__)
|
||||
// TODO: make portable
|
||||
__builtin_ia32_pause();
|
||||
#endif
|
||||
++count;
|
||||
if (count > 8) [[unlikely]] {
|
||||
count = 0;
|
||||
nanosleep(&shortpause, nullptr);
|
||||
// Increase the backoff
|
||||
shortpause.tv_nsec = std::min<decltype(shortpause.tv_nsec)>(shortpause.tv_nsec << 1, 512);
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
uint_fast32_t count{0};
|
||||
timespec shortpause = {.tv_sec = 0, .tv_nsec = 1};
|
||||
};
|
||||
} // namespace
|
||||
|
||||
/**
|
||||
* A reader/writer spin lock.
|
||||
* Stores in a uint32_t,
|
||||
* 0x0000'0001 - is the write bit
|
||||
* rest of the bits hold the count for the number of current writers.
|
||||
* The lock is friendly to writers.
|
||||
* - writer lock() will wait for all readers to leave unlock_shared()
|
||||
* - new reader lock_shared() will wait until writer has unlock()
|
||||
**/
|
||||
struct RWSpinLock {
|
||||
RWSpinLock() = default;
|
||||
|
||||
void lock() {
|
||||
// spin: to grant the UNIQUE_LOCKED bit
|
||||
while (true) {
|
||||
// optimistic: assume we will be granted the lock
|
||||
auto const phase1 = std::atomic_ref{lock_status_}.fetch_or(UNIQUE_LOCKED, std::memory_order_acq_rel);
|
||||
// check: we were granted UNIQUE_LOCK and no current readers
|
||||
if (phase1 == 0) [[likely]]
|
||||
return;
|
||||
// check: we were granted UNIQUE_LOCK, but need to wait for readers
|
||||
if ((phase1 & UNIQUE_LOCKED) != UNIQUE_LOCKED) [[likely]]
|
||||
break;
|
||||
|
||||
// spin: to wait for UNIQUE_LOCKED to be available
|
||||
auto maybe_yield = yeilder{};
|
||||
while (true) {
|
||||
auto const phase2 = std::atomic_ref{lock_status_}.load(std::memory_order_relaxed);
|
||||
// check: we are able to obtain UNIQUE_LOCK
|
||||
if ((phase2 & UNIQUE_LOCKED) != UNIQUE_LOCKED) [[likely]]
|
||||
break;
|
||||
maybe_yield();
|
||||
}
|
||||
}
|
||||
|
||||
// spin: to wait for readers to leave
|
||||
auto maybe_yield = yeilder{};
|
||||
while (true) {
|
||||
auto const phase3 = std::atomic_ref{lock_status_}.load(std::memory_order_relaxed);
|
||||
// check: all readers have gone (leaving only the UNIQUE_LOCKED bit set)
|
||||
if (phase3 == UNIQUE_LOCKED) return;
|
||||
maybe_yield();
|
||||
}
|
||||
}
|
||||
|
||||
void unlock() { std::atomic_ref{lock_status_}.fetch_and(~UNIQUE_LOCKED, std::memory_order_release); }
|
||||
|
||||
void lock_shared() {
|
||||
while (true) {
|
||||
// optimistic: assume we will be granted the lock
|
||||
auto const phase1 = std::atomic_ref{lock_status_}.fetch_add(READER, std::memory_order_acquire);
|
||||
// check: we incremented reader count without the UNIQUE_LOCK already being held
|
||||
if ((phase1 & UNIQUE_LOCKED) != UNIQUE_LOCKED) [[likely]]
|
||||
return;
|
||||
// correct for our optimism, we shouldn't have modified the reader count
|
||||
std::atomic_ref{lock_status_}.fetch_sub(READER, std::memory_order_release);
|
||||
|
||||
// spin: to wait for UNIQUE_LOCKED to be available
|
||||
auto maybe_yield = yeilder{};
|
||||
while (true) {
|
||||
auto const phase2 = std::atomic_ref{lock_status_}.load(std::memory_order_relaxed);
|
||||
// check: UNIQUE_LOCK was released
|
||||
if ((phase2 & UNIQUE_LOCKED) != UNIQUE_LOCKED) [[likely]]
|
||||
break;
|
||||
maybe_yield();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void unlock_shared() { std::atomic_ref{lock_status_}.fetch_sub(READER, std::memory_order_release); }
|
||||
|
||||
private:
|
||||
using status_t = uint32_t;
|
||||
enum FLAGS : status_t {
|
||||
UNIQUE_LOCKED = 1,
|
||||
READER = 2,
|
||||
};
|
||||
|
||||
// TODO: ATM not atomic, just used via atomic_ref, because the type needs to be movable into skip_list
|
||||
// fix the design flaw and then make RWSpinLock a non-copy/non-move type
|
||||
status_t lock_status_ = 0;
|
||||
};
|
||||
} // namespace memgraph::utils
|
Loading…
Reference in New Issue
Block a user