From df3274d78fa615a9f63e731ba2e28b318166fcf2 Mon Sep 17 00:00:00 2001
From: gvolfing <gabor.volfinger@memgraph.io>
Date: Wed, 8 Nov 2023 14:43:06 +0100
Subject: [PATCH] Make the metadata storing objects threadsafe

The objects stored_node_labels_ and stored_edge_types_ can be accesses
through separate threads but it was not safe to do so. This commit
replaces the standard containers with threadsafe ones.
---
 src/storage/v2/disk/storage.cpp           |  2 +-
 src/storage/v2/inmemory/storage.cpp       |  4 +-
 src/storage/v2/storage.cpp                | 10 +---
 src/storage/v2/storage.hpp                |  5 +-
 src/storage/v2/vertex_accessor.cpp        |  2 +-
 src/utils/synchronized_metadata_store.hpp | 65 +++++++++++++++++++++++
 6 files changed, 74 insertions(+), 14 deletions(-)
 create mode 100644 src/utils/synchronized_metadata_store.hpp

diff --git a/src/storage/v2/disk/storage.cpp b/src/storage/v2/disk/storage.cpp
index 8dca66b29..ff9fc6f0c 100644
--- a/src/storage/v2/disk/storage.cpp
+++ b/src/storage/v2/disk/storage.cpp
@@ -944,7 +944,7 @@ Result<EdgeAccessor> DiskStorage::DiskAccessor::CreateEdge(VertexAccessor *from,
   transaction_.manyDeltasCache.Invalidate(from_vertex, edge_type, EdgeDirection::OUT);
   transaction_.manyDeltasCache.Invalidate(to_vertex, edge_type, EdgeDirection::IN);
 
-  storage_->stored_edge_types_.insert(edge_type);
+  storage_->stored_edge_types_.try_insert(edge_type);
   storage_->edge_count_.fetch_add(1, std::memory_order_acq_rel);
 
   return EdgeAccessor(edge, edge_type, from_vertex, to_vertex, storage_, &transaction_);
diff --git a/src/storage/v2/inmemory/storage.cpp b/src/storage/v2/inmemory/storage.cpp
index 5ed493432..797ca844f 100644
--- a/src/storage/v2/inmemory/storage.cpp
+++ b/src/storage/v2/inmemory/storage.cpp
@@ -278,7 +278,7 @@ Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::CreateEdge(VertexAccesso
     if (to_vertex->deleted) return Error::DELETED_OBJECT;
   }
 
-  storage_->stored_edge_types_.insert(edge_type);
+  storage_->stored_edge_types_.try_insert(edge_type);
   auto *mem_storage = static_cast<InMemoryStorage *>(storage_);
   auto gid = storage::Gid::FromUint(mem_storage->edge_id_.fetch_add(1, std::memory_order_acq_rel));
   EdgeRef edge(gid);
@@ -343,7 +343,7 @@ Result<EdgeAccessor> InMemoryStorage::InMemoryAccessor::CreateEdgeEx(VertexAcces
     if (to_vertex->deleted) return Error::DELETED_OBJECT;
   }
 
-  storage_->stored_edge_types_.insert(edge_type);
+  storage_->stored_edge_types_.try_insert(edge_type);
 
   // NOTE: When we update the next `edge_id_` here we perform a RMW
   // (read-modify-write) operation that ISN'T atomic! But, that isn't an issue
diff --git a/src/storage/v2/storage.cpp b/src/storage/v2/storage.cpp
index 20d458e40..e07d96065 100644
--- a/src/storage/v2/storage.cpp
+++ b/src/storage/v2/storage.cpp
@@ -106,19 +106,13 @@ std::optional<uint64_t> Storage::Accessor::GetTransactionId() const {
 
 std::vector<LabelId> Storage::Accessor::ListAllPossiblyPresentVertexLabels() const {
   std::vector<LabelId> vertex_labels;
-  vertex_labels.reserve(storage_->stored_node_labels_.size());
-  for (const auto label : storage_->stored_node_labels_) {
-    vertex_labels.push_back(label);
-  }
+  storage_->stored_node_labels_.for_each([&vertex_labels](const auto &label) { vertex_labels.push_back(label); });
   return vertex_labels;
 }
 
 std::vector<EdgeTypeId> Storage::Accessor::ListAllPossiblyPresentEdgeTypes() const {
   std::vector<EdgeTypeId> edge_types;
-  edge_types.reserve(storage_->stored_edge_types_.size());
-  for (const auto edge_type : storage_->stored_edge_types_) {
-    edge_types.push_back(edge_type);
-  }
+  storage_->stored_edge_types_.for_each([&edge_types](const auto &type) { edge_types.push_back(type); });
   return edge_types;
 }
 
diff --git a/src/storage/v2/storage.hpp b/src/storage/v2/storage.hpp
index 0499b4665..b1bee1a6a 100644
--- a/src/storage/v2/storage.hpp
+++ b/src/storage/v2/storage.hpp
@@ -40,6 +40,7 @@
 #include "utils/event_histogram.hpp"
 #include "utils/resource_lock.hpp"
 #include "utils/scheduler.hpp"
+#include "utils/synchronized_metadata_store.hpp"
 #include "utils/timer.hpp"
 #include "utils/uuid.hpp"
 
@@ -407,8 +408,8 @@ class Storage {
   // be present in the database.
 
   // TODO(gvolfing): check if this would be faster with flat_maps.
-  std::unordered_set<LabelId> stored_node_labels_;
-  std::unordered_set<EdgeTypeId> stored_edge_types_;
+  utils::SynchronizedMetaDataStore<LabelId> stored_node_labels_;
+  utils::SynchronizedMetaDataStore<EdgeTypeId> stored_edge_types_;
 
   std::atomic<uint64_t> vertex_id_{0};
   std::atomic<uint64_t> edge_id_{0};
diff --git a/src/storage/v2/vertex_accessor.cpp b/src/storage/v2/vertex_accessor.cpp
index 91ffd547e..a3527ff38 100644
--- a/src/storage/v2/vertex_accessor.cpp
+++ b/src/storage/v2/vertex_accessor.cpp
@@ -109,7 +109,7 @@ Result<bool> VertexAccessor::AddLabel(LabelId label) {
 
   CreateAndLinkDelta(transaction_, vertex_, Delta::RemoveLabelTag(), label);
   vertex_->labels.push_back(label);
-  storage_->stored_node_labels_.insert(label);
+  storage_->stored_node_labels_.try_insert(label);
 
   /// TODO: some by pointers, some by reference => not good, make it better
   storage_->constraints_.unique_constraints_->UpdateOnAddLabel(label, *vertex_, transaction_->start_timestamp);
diff --git a/src/utils/synchronized_metadata_store.hpp b/src/utils/synchronized_metadata_store.hpp
new file mode 100644
index 000000000..0c0a85c21
--- /dev/null
+++ b/src/utils/synchronized_metadata_store.hpp
@@ -0,0 +1,65 @@
+// Copyright 2023 Memgraph Ltd.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
+// License, and you may not use this file except in compliance with the Business Source License.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+#pragma once
+
+#include <mutex>
+#include <shared_mutex>
+#include <unordered_set>
+
+#include "utils/rw_lock.hpp"
+#include "utils/synchronized.hpp"
+
+namespace memgraph::utils {
+
+template <typename T>
+class SynchronizedMetaDataStore {
+ public:
+  SynchronizedMetaDataStore() = default;
+  ~SynchronizedMetaDataStore() = default;
+
+  SynchronizedMetaDataStore(const SynchronizedMetaDataStore &) = delete;
+  SynchronizedMetaDataStore(SynchronizedMetaDataStore &&) = delete;
+  SynchronizedMetaDataStore &operator=(const SynchronizedMetaDataStore &) = delete;
+  SynchronizedMetaDataStore &operator=(SynchronizedMetaDataStore &&) = delete;
+
+  void try_insert(const T &elem) {
+    {
+      std::shared_lock read_lock(lock_);
+      if (element_store_.contains(elem)) {
+        return;
+      }
+    }
+    {
+      std::unique_lock write_lock(lock_);
+      element_store_.insert(elem);
+    }
+  }
+
+  void erase(const T &elem) {
+    std::unique_lock write_lock(lock_);
+    element_store_.erase(elem);
+  }
+
+  template <typename TFunc>
+  void for_each(const TFunc &func) {
+    std::unique_lock write_lock(lock_);
+    for (const auto &elem : element_store_) {
+      func(elem);
+    }
+  }
+
+ private:
+  std::unordered_set<T> element_store_;
+  RWLock lock_{RWLock::Priority::READ};
+};
+
+}  // namespace memgraph::utils