Make the metadata storing objects threadsafe

The objects stored_node_labels_ and stored_edge_types_ can be accesses
through separate threads but it was not safe to do so. This commit
replaces the standard containers with threadsafe ones.
This commit is contained in:
gvolfing 2023-11-08 14:43:06 +01:00
parent 2946d74fdd
commit df3274d78f
6 changed files with 74 additions and 14 deletions

View File

@ -944,7 +944,7 @@ Result<EdgeAccessor> DiskStorage::DiskAccessor::CreateEdge(VertexAccessor *from,
transaction_.manyDeltasCache.Invalidate(from_vertex, edge_type, EdgeDirection::OUT);
transaction_.manyDeltasCache.Invalidate(to_vertex, edge_type, EdgeDirection::IN);
storage_->stored_edge_types_.insert(edge_type);
storage_->stored_edge_types_.try_insert(edge_type);
storage_->edge_count_.fetch_add(1, std::memory_order_acq_rel);
return EdgeAccessor(edge, edge_type, from_vertex, to_vertex, storage_, &transaction_);

View File

@ -278,7 +278,7 @@ Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::CreateEdge(VertexAccesso
if (to_vertex->deleted) return Error::DELETED_OBJECT;
}
storage_->stored_edge_types_.insert(edge_type);
storage_->stored_edge_types_.try_insert(edge_type);
auto *mem_storage = static_cast<InMemoryStorage *>(storage_);
auto gid = storage::Gid::FromUint(mem_storage->edge_id_.fetch_add(1, std::memory_order_acq_rel));
EdgeRef edge(gid);
@ -343,7 +343,7 @@ Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::CreateEdgeEx(VertexAcces
if (to_vertex->deleted) return Error::DELETED_OBJECT;
}
storage_->stored_edge_types_.insert(edge_type);
storage_->stored_edge_types_.try_insert(edge_type);
// NOTE: When we update the next `edge_id_` here we perform a RMW
// (read-modify-write) operation that ISN'T atomic! But, that isn't an issue

View File

@ -106,19 +106,13 @@ std::optional<uint64_t> Storage::Accessor::GetTransactionId() const {
std::vector<LabelId> Storage::Accessor::ListAllPossiblyPresentVertexLabels() const {
std::vector<LabelId> vertex_labels;
vertex_labels.reserve(storage_->stored_node_labels_.size());
for (const auto label : storage_->stored_node_labels_) {
vertex_labels.push_back(label);
}
storage_->stored_node_labels_.for_each([&vertex_labels](const auto &label) { vertex_labels.push_back(label); });
return vertex_labels;
}
std::vector<EdgeTypeId> Storage::Accessor::ListAllPossiblyPresentEdgeTypes() const {
std::vector<EdgeTypeId> edge_types;
edge_types.reserve(storage_->stored_edge_types_.size());
for (const auto edge_type : storage_->stored_edge_types_) {
edge_types.push_back(edge_type);
}
storage_->stored_edge_types_.for_each([&edge_types](const auto &type) { edge_types.push_back(type); });
return edge_types;
}

View File

@ -40,6 +40,7 @@
#include "utils/event_histogram.hpp"
#include "utils/resource_lock.hpp"
#include "utils/scheduler.hpp"
#include "utils/synchronized_metadata_store.hpp"
#include "utils/timer.hpp"
#include "utils/uuid.hpp"
@ -407,8 +408,8 @@ class Storage {
// be present in the database.
// TODO(gvolfing): check if this would be faster with flat_maps.
std::unordered_set<LabelId> stored_node_labels_;
std::unordered_set<EdgeTypeId> stored_edge_types_;
utils::SynchronizedMetaDataStore<LabelId> stored_node_labels_;
utils::SynchronizedMetaDataStore<EdgeTypeId> stored_edge_types_;
std::atomic<uint64_t> vertex_id_{0};
std::atomic<uint64_t> edge_id_{0};

View File

@ -109,7 +109,7 @@ Result<bool> VertexAccessor::AddLabel(LabelId label) {
CreateAndLinkDelta(transaction_, vertex_, Delta::RemoveLabelTag(), label);
vertex_->labels.push_back(label);
storage_->stored_node_labels_.insert(label);
storage_->stored_node_labels_.try_insert(label);
/// TODO: some by pointers, some by reference => not good, make it better
storage_->constraints_.unique_constraints_->UpdateOnAddLabel(label, *vertex_, transaction_->start_timestamp);

View File

@ -0,0 +1,65 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <mutex>
#include <shared_mutex>
#include <unordered_set>
#include "utils/rw_lock.hpp"
#include "utils/synchronized.hpp"
namespace memgraph::utils {
template <typename T>
class SynchronizedMetaDataStore {
public:
SynchronizedMetaDataStore() = default;
~SynchronizedMetaDataStore() = default;
SynchronizedMetaDataStore(const SynchronizedMetaDataStore &) = delete;
SynchronizedMetaDataStore(SynchronizedMetaDataStore &&) = delete;
SynchronizedMetaDataStore &operator=(const SynchronizedMetaDataStore &) = delete;
SynchronizedMetaDataStore &operator=(SynchronizedMetaDataStore &&) = delete;
void try_insert(const T &elem) {
{
std::shared_lock read_lock(lock_);
if (element_store_.contains(elem)) {
return;
}
}
{
std::unique_lock write_lock(lock_);
element_store_.insert(elem);
}
}
void erase(const T &elem) {
std::unique_lock write_lock(lock_);
element_store_.erase(elem);
}
template <typename TFunc>
void for_each(const TFunc &func) {
std::unique_lock write_lock(lock_);
for (const auto &elem : element_store_) {
func(elem);
}
}
private:
std::unordered_set<T> element_store_;
RWLock lock_{RWLock::Priority::READ};
};
} // namespace memgraph::utils