From 8bc8e867e48d9706b0519c4c370ed77811f75335 Mon Sep 17 00:00:00 2001
From: Gareth Andrew Lloyd <gareth.lloyd@memgraph.io>
Date: Thu, 14 Mar 2024 18:21:59 +0000
Subject: [PATCH 1/2] Pmr allocator unify (#1801)

Query allocator and evaluation allocator were different.
After analysis, was determined they should be the same, this will help
future development reduce TypeValue copies during queries.

Changes:
- Common allocator, PoolResource backed by MonotonicResource
- Optimized Pool, now O(1) alloc/dealloc as all chunks in Pool form a single
  free list
- 2nd PoolResource, using bin sizing, not as perfect for memory usage but
  O(1) bin selection
- Now have jemalloc's background thread to make sure decay and return
  to OS happens
- Optimized ProperyValue to be faster at destruction/copy/move
- Less temporary memory allocations
  - CSV reader now maintains a common line buffer it reuses on line reads
  - Writing out bolt values, now reuses a values buffer
  - Evaluating an int no longer makes temporary strings for errors it most
    likely never throws
  - ExpandVariable will reuse existing edge list in frame it one existed
---
 CMakeLists.txt                       |  13 ++
 libs/setup.sh                        |   4 +-
 src/csv/include/csv/parsing.hpp      |   4 +-
 src/csv/parsing.cpp                  |  37 +++-
 src/glue/SessionHL.cpp               |  21 +-
 src/query/interpret/eval.cpp         |   6 +-
 src/query/interpret/eval.hpp         |   2 +-
 src/query/interpreter.cpp            | 193 +++-------------
 src/query/interpreter.hpp            |  88 +++++---
 src/query/plan/operator.cpp          |  50 +++--
 src/query/plan/operator.hpp          |  19 +-
 src/query/trigger.cpp                |  23 +-
 src/query/trigger.hpp                |   2 +-
 src/storage/v2/property_value.hpp    | 225 +++++++++++++------
 src/utils/memory.cpp                 | 317 ++++++++++++---------------
 src/utils/memory.hpp                 | 298 +++++++++++++++++--------
 src/utils/tag.hpp                    |  32 +++
 tests/benchmark/query/execution.cpp  |   4 +-
 tests/benchmark/skip_list_vs_stl.cpp |  14 +-
 tests/e2e/memory/workloads.yaml      |  20 +-
 tests/mgbench/runners.py             |   2 +
 tests/unit/property_value_v2.cpp     |   4 +-
 tests/unit/utils_memory.cpp          | 130 +----------
 23 files changed, 738 insertions(+), 770 deletions(-)
 create mode 100644 src/utils/tag.hpp

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 028406447..c02039497 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -300,6 +300,19 @@ endif()
 
 option(ENABLE_JEMALLOC "Use jemalloc" ON)
 
+option(MG_MEMORY_PROFILE "If build should be setup for memory profiling" OFF)
+if (MG_MEMORY_PROFILE AND ENABLE_JEMALLOC)
+    message(STATUS "Jemalloc has been disabled because MG_MEMORY_PROFILE is enabled")
+    set(ENABLE_JEMALLOC OFF)
+endif ()
+if (MG_MEMORY_PROFILE AND ASAN)
+    message(STATUS "ASAN has been disabled because MG_MEMORY_PROFILE is enabled")
+    set(ASAN OFF)
+endif ()
+if (MG_MEMORY_PROFILE)
+    add_compile_definitions(MG_MEMORY_PROFILE)
+endif ()
+
 if (ASAN)
   message(WARNING "Disabling jemalloc as it doesn't work well with ASAN")
   set(ENABLE_JEMALLOC OFF)
diff --git a/libs/setup.sh b/libs/setup.sh
index e1e1243af..e23c5efef 100755
--- a/libs/setup.sh
+++ b/libs/setup.sh
@@ -268,13 +268,13 @@ repo_clone_try_double "${primary_urls[jemalloc]}" "${secondary_urls[jemalloc]}"
 pushd jemalloc
 
 ./autogen.sh
-MALLOC_CONF="retain:false,percpu_arena:percpu,oversize_threshold:0,muzzy_decay_ms:5000,dirty_decay_ms:5000" \
+MALLOC_CONF="background_thread:true,retain:false,percpu_arena:percpu,oversize_threshold:0,muzzy_decay_ms:5000,dirty_decay_ms:5000" \
 ./configure \
   --disable-cxx \
   --with-lg-page=12 \
   --with-lg-hugepage=21 \
   --enable-shared=no --prefix=$working_dir \
-  --with-malloc-conf="retain:false,percpu_arena:percpu,oversize_threshold:0,muzzy_decay_ms:5000,dirty_decay_ms:5000"
+  --with-malloc-conf="background_thread:true,retain:false,percpu_arena:percpu,oversize_threshold:0,muzzy_decay_ms:5000,dirty_decay_ms:5000"
 
 make -j$CPUS install
 popd
diff --git a/src/csv/include/csv/parsing.hpp b/src/csv/include/csv/parsing.hpp
index 66f2913c8..0accc616d 100644
--- a/src/csv/include/csv/parsing.hpp
+++ b/src/csv/include/csv/parsing.hpp
@@ -1,4 +1,4 @@
-// Copyright 2023 Memgraph Ltd.
+// Copyright 2024 Memgraph Ltd.
 //
 // Use of this software is governed by the Business Source License
 // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@@ -119,6 +119,8 @@ class Reader {
   auto GetHeader() const -> Header const &;
   auto GetNextRow(utils::MemoryResource *mem) -> std::optional<Row>;
 
+  void Reset();
+
  private:
   // Some implementation issues that need clearing up, but this is mainly because
   // I don't want `boost/iostreams/filtering_stream.hpp` included in this header file
diff --git a/src/csv/parsing.cpp b/src/csv/parsing.cpp
index 6d03dc7fd..6961a42e4 100644
--- a/src/csv/parsing.cpp
+++ b/src/csv/parsing.cpp
@@ -1,4 +1,4 @@
-// Copyright 2023 Memgraph Ltd.
+// Copyright 2024 Memgraph Ltd.
 //
 // Use of this software is governed by the Business Source License
 // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@@ -34,6 +34,10 @@ struct Reader::impl {
 
   [[nodiscard]] bool HasHeader() const { return read_config_.with_header; }
   [[nodiscard]] auto Header() const -> Header const & { return header_; }
+  void Reset() {
+    line_buffer_.clear();
+    line_buffer_.shrink_to_fit();
+  }
 
   auto GetNextRow(utils::MemoryResource *mem) -> std::optional<Reader::Row>;
 
@@ -42,7 +46,7 @@ struct Reader::impl {
 
   void TryInitializeHeader();
 
-  std::optional<utils::pmr::string> GetNextLine(utils::MemoryResource *mem);
+  bool GetNextLine();
 
   ParsingResult ParseHeader();
 
@@ -55,6 +59,8 @@ struct Reader::impl {
   Config read_config_;
   uint64_t line_count_{1};
   uint16_t number_of_columns_{0};
+  uint64_t estimated_number_of_columns_{0};
+  utils::pmr::string line_buffer_{memory_};
   Reader::Header header_{memory_};
 };
 
@@ -129,17 +135,16 @@ void Reader::impl::InitializeStream() {
   MG_ASSERT(csv_stream_.is_complete(), "Should be 'complete' for correct operation");
 }
 
-std::optional<utils::pmr::string> Reader::impl::GetNextLine(utils::MemoryResource *mem) {
-  utils::pmr::string line(mem);
-  if (!std::getline(csv_stream_, line)) {
+bool Reader::impl::GetNextLine() {
+  if (!std::getline(csv_stream_, line_buffer_)) {
     // reached end of file or an I/0 error occurred
     if (!csv_stream_.good()) {
       csv_stream_.reset();  // this will close the file_stream_ and clear the chain
     }
-    return std::nullopt;
+    return false;
   }
   ++line_count_;
-  return std::move(line);
+  return true;
 }
 
 Reader::ParsingResult Reader::impl::ParseHeader() {
@@ -170,6 +175,8 @@ void Reader::impl::TryInitializeHeader() {
 
 const Reader::Header &Reader::GetHeader() const { return pimpl->Header(); }
 
+void Reader::Reset() { pimpl->Reset(); }
+
 namespace {
 enum class CsvParserState : uint8_t { INITIAL_FIELD, NEXT_FIELD, QUOTING, EXPECT_DELIMITER, DONE };
 
@@ -179,6 +186,8 @@ Reader::ParsingResult Reader::impl::ParseRow(utils::MemoryResource *mem) {
   utils::pmr::vector<utils::pmr::string> row(mem);
   if (number_of_columns_ != 0) {
     row.reserve(number_of_columns_);
+  } else if (estimated_number_of_columns_ != 0) {
+    row.reserve(estimated_number_of_columns_);
   }
 
   utils::pmr::string column(memory_);
@@ -186,13 +195,12 @@ Reader::ParsingResult Reader::impl::ParseRow(utils::MemoryResource *mem) {
   auto state = CsvParserState::INITIAL_FIELD;
 
   do {
-    const auto maybe_line = GetNextLine(mem);
-    if (!maybe_line) {
+    if (!GetNextLine()) {
       // The whole file was processed.
       break;
     }
 
-    std::string_view line_string_view = *maybe_line;
+    std::string_view line_string_view = line_buffer_;
 
     // remove '\r' from the end in case we have dos file format
     if (line_string_view.back() == '\r') {
@@ -312,6 +320,11 @@ Reader::ParsingResult Reader::impl::ParseRow(utils::MemoryResource *mem) {
                       fmt::format("Expected {:d} columns in row {:d}, but got {:d}", number_of_columns_,
                                   line_count_ - 1, row.size()));
   }
+  // To avoid unessisary dynamic growth of the row, remember the number of
+  // columns for future calls
+  if (number_of_columns_ == 0 && estimated_number_of_columns_ == 0) {
+    estimated_number_of_columns_ = row.size();
+  }
 
   return std::move(row);
 }
@@ -319,7 +332,7 @@ Reader::ParsingResult Reader::impl::ParseRow(utils::MemoryResource *mem) {
 std::optional<Reader::Row> Reader::impl::GetNextRow(utils::MemoryResource *mem) {
   auto row = ParseRow(mem);
 
-  if (row.HasError()) {
+  if (row.HasError()) [[unlikely]] {
     if (!read_config_.ignore_bad) {
       throw CsvReadException("CSV Reader: Bad row at line {:d}: {}", line_count_ - 1, row.GetError().message);
     }
@@ -333,7 +346,7 @@ std::optional<Reader::Row> Reader::impl::GetNextRow(utils::MemoryResource *mem)
     } while (row.HasError());
   }
 
-  if (row->empty()) {
+  if (row->empty()) [[unlikely]] {
     // reached end of file
     return std::nullopt;
   }
diff --git a/src/glue/SessionHL.cpp b/src/glue/SessionHL.cpp
index 6c901516c..6a48f15ca 100644
--- a/src/glue/SessionHL.cpp
+++ b/src/glue/SessionHL.cpp
@@ -59,12 +59,14 @@ class TypedValueResultStreamBase {
  public:
   explicit TypedValueResultStreamBase(memgraph::storage::Storage *storage);
 
-  std::vector<memgraph::communication::bolt::Value> DecodeValues(
-      const std::vector<memgraph::query::TypedValue> &values) const;
+  void DecodeValues(const std::vector<memgraph::query::TypedValue> &values);
+
+  auto AccessValues() const -> std::vector<memgraph::communication::bolt::Value> const & { return decoded_values_; }
 
  protected:
   // NOTE: Needed only for ToBoltValue conversions
   memgraph::storage::Storage *storage_;
+  std::vector<memgraph::communication::bolt::Value> decoded_values_;
 };
 
 /// Wrapper around TEncoder which converts TypedValue to Value
@@ -75,16 +77,18 @@ class TypedValueResultStream : public TypedValueResultStreamBase {
   TypedValueResultStream(TEncoder *encoder, memgraph::storage::Storage *storage)
       : TypedValueResultStreamBase{storage}, encoder_(encoder) {}
 
-  void Result(const std::vector<memgraph::query::TypedValue> &values) { encoder_->MessageRecord(DecodeValues(values)); }
+  void Result(const std::vector<memgraph::query::TypedValue> &values) {
+    DecodeValues(values);
+    encoder_->MessageRecord(AccessValues());
+  }
 
  private:
   TEncoder *encoder_;
 };
 
-std::vector<memgraph::communication::bolt::Value> TypedValueResultStreamBase::DecodeValues(
-    const std::vector<memgraph::query::TypedValue> &values) const {
-  std::vector<memgraph::communication::bolt::Value> decoded_values;
-  decoded_values.reserve(values.size());
+void TypedValueResultStreamBase::DecodeValues(const std::vector<memgraph::query::TypedValue> &values) {
+  decoded_values_.reserve(values.size());
+  decoded_values_.clear();
   for (const auto &v : values) {
     auto maybe_value = memgraph::glue::ToBoltValue(v, storage_, memgraph::storage::View::NEW);
     if (maybe_value.HasError()) {
@@ -99,9 +103,8 @@ std::vector<memgraph::communication::bolt::Value> TypedValueResultStreamBase::De
           throw memgraph::communication::bolt::ClientError("Unexpected storage error when streaming results.");
       }
     }
-    decoded_values.emplace_back(std::move(*maybe_value));
+    decoded_values_.emplace_back(std::move(*maybe_value));
   }
-  return decoded_values;
 }
 
 TypedValueResultStreamBase::TypedValueResultStreamBase(memgraph::storage::Storage *storage) : storage_(storage) {}
diff --git a/src/query/interpret/eval.cpp b/src/query/interpret/eval.cpp
index 8bd308420..7c5d838a5 100644
--- a/src/query/interpret/eval.cpp
+++ b/src/query/interpret/eval.cpp
@@ -1,4 +1,4 @@
-// Copyright 2023 Memgraph Ltd.
+// Copyright 2024 Memgraph Ltd.
 //
 // Use of this software is governed by the Business Source License
 // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@@ -13,12 +13,12 @@
 
 namespace memgraph::query {
 
-int64_t EvaluateInt(ExpressionEvaluator *evaluator, Expression *expr, const std::string &what) {
+int64_t EvaluateInt(ExpressionEvaluator *evaluator, Expression *expr, std::string_view what) {
   TypedValue value = expr->Accept(*evaluator);
   try {
     return value.ValueInt();
   } catch (TypedValueException &e) {
-    throw QueryRuntimeException(what + " must be an int");
+    throw QueryRuntimeException(std::string(what) + " must be an int");
   }
 }
 
diff --git a/src/query/interpret/eval.hpp b/src/query/interpret/eval.hpp
index 2a9fb289f..07a71412c 100644
--- a/src/query/interpret/eval.hpp
+++ b/src/query/interpret/eval.hpp
@@ -1209,7 +1209,7 @@ class ExpressionEvaluator : public ExpressionVisitor<TypedValue> {
 /// @param what - Name of what's getting evaluated. Used for user feedback (via
 ///               exception) when the evaluated value is not an int.
 /// @throw QueryRuntimeException if expression doesn't evaluate to an int.
-int64_t EvaluateInt(ExpressionEvaluator *evaluator, Expression *expr, const std::string &what);
+int64_t EvaluateInt(ExpressionEvaluator *evaluator, Expression *expr, std::string_view what);
 
 std::optional<size_t> EvaluateMemoryLimit(ExpressionVisitor<TypedValue> &eval, Expression *memory_limit,
                                           size_t memory_scale);
diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp
index a5c81cc72..1322a7b99 100644
--- a/src/query/interpreter.cpp
+++ b/src/query/interpreter.cpp
@@ -246,27 +246,6 @@ std::optional<std::string> GetOptionalStringValue(query::Expression *expression,
   return {};
 };
 
-bool IsAllShortestPathsQuery(const std::vector<memgraph::query::Clause *> &clauses) {
-  for (const auto &clause : clauses) {
-    if (clause->GetTypeInfo() != Match::kType) {
-      continue;
-    }
-    auto *match_clause = utils::Downcast<Match>(clause);
-    for (const auto &pattern : match_clause->patterns_) {
-      for (const auto &atom : pattern->atoms_) {
-        if (atom->GetTypeInfo() != EdgeAtom::kType) {
-          continue;
-        }
-        auto *edge_atom = utils::Downcast<EdgeAtom>(atom);
-        if (edge_atom->type_ == EdgeAtom::Type::ALL_SHORTEST_PATHS) {
-          return true;
-        }
-      }
-    }
-  }
-  return false;
-}
-
 inline auto convertFromCoordinatorToReplicationMode(const CoordinatorQuery::SyncMode &sync_mode)
     -> replication_coordination_glue::ReplicationMode {
   switch (sync_mode) {
@@ -1733,8 +1712,7 @@ struct PullPlan {
                     std::shared_ptr<QueryUserOrRole> user_or_role, std::atomic<TransactionStatus> *transaction_status,
                     std::shared_ptr<utils::AsyncTimer> tx_timer,
                     TriggerContextCollector *trigger_context_collector = nullptr,
-                    std::optional<size_t> memory_limit = {}, bool use_monotonic_memory = true,
-                    FrameChangeCollector *frame_change_collector_ = nullptr);
+                    std::optional<size_t> memory_limit = {}, FrameChangeCollector *frame_change_collector_ = nullptr);
 
   std::optional<plan::ProfilingStatsWithTotalTime> Pull(AnyStream *stream, std::optional<int> n,
                                                         const std::vector<Symbol> &output_symbols,
@@ -1759,26 +1737,17 @@ struct PullPlan {
   // we have to keep track of any unsent results from previous `PullPlan::Pull`
   // manually by using this flag.
   bool has_unsent_results_ = false;
-
-  // In the case of LOAD CSV, we want to use only PoolResource without MonotonicMemoryResource
-  // to reuse allocated memory. As LOAD CSV is processing row by row
-  // it is possible to reduce memory usage significantly if MemoryResource deals with memory allocation
-  // can reuse memory that was allocated on processing the first row on all subsequent rows.
-  // This flag signals to `PullPlan::Pull` which MemoryResource to use
-  bool use_monotonic_memory_;
 };
 
 PullPlan::PullPlan(const std::shared_ptr<PlanWrapper> plan, const Parameters &parameters, const bool is_profile_query,
                    DbAccessor *dba, InterpreterContext *interpreter_context, utils::MemoryResource *execution_memory,
                    std::shared_ptr<QueryUserOrRole> user_or_role, std::atomic<TransactionStatus> *transaction_status,
                    std::shared_ptr<utils::AsyncTimer> tx_timer, TriggerContextCollector *trigger_context_collector,
-                   const std::optional<size_t> memory_limit, bool use_monotonic_memory,
-                   FrameChangeCollector *frame_change_collector)
+                   const std::optional<size_t> memory_limit, FrameChangeCollector *frame_change_collector)
     : plan_(plan),
       cursor_(plan->plan().MakeCursor(execution_memory)),
       frame_(plan->symbol_table().max_position(), execution_memory),
-      memory_limit_(memory_limit),
-      use_monotonic_memory_(use_monotonic_memory) {
+      memory_limit_(memory_limit) {
   ctx_.db_accessor = dba;
   ctx_.symbol_table = plan->symbol_table();
   ctx_.evaluation_context.timestamp = QueryTimestamp();
@@ -1804,6 +1773,7 @@ PullPlan::PullPlan(const std::shared_ptr<PlanWrapper> plan, const Parameters &pa
   ctx_.is_profile_query = is_profile_query;
   ctx_.trigger_context_collector = trigger_context_collector;
   ctx_.frame_change_collector = frame_change_collector;
+  ctx_.evaluation_context.memory = execution_memory;
 }
 
 std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::Pull(AnyStream *stream, std::optional<int> n,
@@ -1827,43 +1797,14 @@ std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::Pull(AnyStream *strea
         }
       }};
 
-  // Set up temporary memory for a single Pull. Initial memory comes from the
-  // stack. 256 KiB should fit on the stack and should be more than enough for a
-  // single `Pull`.
-  static constexpr size_t stack_size = 256UL * 1024UL;
-  char stack_data[stack_size];
-
-  utils::ResourceWithOutOfMemoryException resource_with_exception;
-  utils::MonotonicBufferResource monotonic_memory{&stack_data[0], stack_size, &resource_with_exception};
-  std::optional<utils::PoolResource> pool_memory;
-  static constexpr auto kMaxBlockPerChunks = 128;
-
-  if (!use_monotonic_memory_) {
-    pool_memory.emplace(kMaxBlockPerChunks, kExecutionPoolMaxBlockSize, &resource_with_exception,
-                        &resource_with_exception);
-  } else {
-    // We can throw on every query because a simple queries for deleting will use only
-    // the stack allocated buffer.
-    // Also, we want to throw only when the query engine requests more memory and not the storage
-    // so we add the exception to the allocator.
-    // TODO (mferencevic): Tune the parameters accordingly.
-    pool_memory.emplace(kMaxBlockPerChunks, 1024, &monotonic_memory, &resource_with_exception);
-  }
-
-  ctx_.evaluation_context.memory = &*pool_memory;
-
   // Returns true if a result was pulled.
   const auto pull_result = [&]() -> bool { return cursor_->Pull(frame_, ctx_); };
 
-  const auto stream_values = [&]() {
-    // TODO: The streamed values should also probably use the above memory.
-    std::vector<TypedValue> values;
-    values.reserve(output_symbols.size());
-
-    for (const auto &symbol : output_symbols) {
-      values.emplace_back(frame_[symbol]);
+  auto values = std::vector<TypedValue>(output_symbols.size());
+  const auto stream_values = [&] {
+    for (auto const i : ranges::views::iota(0UL, output_symbols.size())) {
+      values[i] = frame_[output_symbols[i]];
     }
-
     stream->Result(values);
   };
 
@@ -1973,7 +1914,6 @@ PreparedQuery Interpreter::PrepareTransactionQuery(std::string_view query_upper,
   std::function<void()> handler;
 
   if (query_upper == "BEGIN") {
-    ResetInterpreter();
     // TODO: Evaluate doing move(extras). Currently the extras is very small, but this will be important if it ever
     // becomes large.
     handler = [this, extras = extras] {
@@ -2051,30 +1991,6 @@ inline static void TryCaching(const AstStorage &ast_storage, FrameChangeCollecto
   }
 }
 
-bool IsLoadCsvQuery(const std::vector<memgraph::query::Clause *> &clauses) {
-  return std::any_of(clauses.begin(), clauses.end(),
-                     [](memgraph::query::Clause const *clause) { return clause->GetTypeInfo() == LoadCsv::kType; });
-}
-
-bool IsCallBatchedProcedureQuery(const std::vector<memgraph::query::Clause *> &clauses) {
-  EvaluationContext evaluation_context;
-
-  return std::ranges::any_of(clauses, [&evaluation_context](memgraph::query::Clause *clause) -> bool {
-    if (!(clause->GetTypeInfo() == CallProcedure::kType)) return false;
-    auto *call_procedure_clause = utils::Downcast<CallProcedure>(clause);
-
-    const auto &maybe_found = memgraph::query::procedure::FindProcedure(
-        procedure::gModuleRegistry, call_procedure_clause->procedure_name_, evaluation_context.memory);
-    if (!maybe_found) {
-      throw QueryRuntimeException("There is no procedure named '{}'.", call_procedure_clause->procedure_name_);
-    }
-    const auto &[module, proc] = *maybe_found;
-    if (!proc->info.is_batched) return false;
-    spdlog::trace("Using PoolResource for batched query procedure");
-    return true;
-  });
-}
-
 PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::map<std::string, TypedValue> *summary,
                                  InterpreterContext *interpreter_context, CurrentDB &current_db,
                                  utils::MemoryResource *execution_memory, std::vector<Notification> *notifications,
@@ -2094,7 +2010,6 @@ PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::map<std::string,
     spdlog::info("Running query with memory limit of {}", utils::GetReadableSize(*memory_limit));
   }
   auto clauses = cypher_query->single_query_->clauses_;
-  bool contains_csv = false;
   if (std::any_of(clauses.begin(), clauses.end(),
                   [](const auto *clause) { return clause->GetTypeInfo() == LoadCsv::kType; })) {
     notifications->emplace_back(
@@ -2102,13 +2017,8 @@ PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::map<std::string,
         "It's important to note that the parser parses the values as strings. It's up to the user to "
         "convert the parsed row values to the appropriate type. This can be done using the built-in "
         "conversion functions such as ToInteger, ToFloat, ToBoolean etc.");
-    contains_csv = true;
   }
 
-  // If this is LOAD CSV query, use PoolResource without MonotonicMemoryResource as we want to reuse allocated memory
-  auto use_monotonic_memory =
-      !contains_csv && !IsCallBatchedProcedureQuery(clauses) && !IsAllShortestPathsQuery(clauses);
-
   MG_ASSERT(current_db.execution_db_accessor_, "Cypher query expects a current DB transaction");
   auto *dba =
       &*current_db
@@ -2147,7 +2057,7 @@ PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::map<std::string,
       current_db.trigger_context_collector_ ? &*current_db.trigger_context_collector_ : nullptr;
   auto pull_plan = std::make_shared<PullPlan>(
       plan, parsed_query.parameters, false, dba, interpreter_context, execution_memory, std::move(user_or_role),
-      transaction_status, std::move(tx_timer), trigger_context_collector, memory_limit, use_monotonic_memory,
+      transaction_status, std::move(tx_timer), trigger_context_collector, memory_limit,
       frame_change_collector->IsTrackingValues() ? frame_change_collector : nullptr);
   return PreparedQuery{std::move(header), std::move(parsed_query.required_privileges),
                        [pull_plan = std::move(pull_plan), output_symbols = std::move(output_symbols), summary](
@@ -2261,18 +2171,6 @@ PreparedQuery PrepareProfileQuery(ParsedQuery parsed_query, bool in_explicit_tra
 
   auto *cypher_query = utils::Downcast<CypherQuery>(parsed_inner_query.query);
 
-  bool contains_csv = false;
-  auto clauses = cypher_query->single_query_->clauses_;
-  if (std::any_of(clauses.begin(), clauses.end(),
-                  [](const auto *clause) { return clause->GetTypeInfo() == LoadCsv::kType; })) {
-    contains_csv = true;
-  }
-
-  // If this is LOAD CSV, BatchedProcedure or AllShortest query, use PoolResource without MonotonicMemoryResource as we
-  // want to reuse allocated memory
-  auto use_monotonic_memory =
-      !contains_csv && !IsCallBatchedProcedureQuery(clauses) && !IsAllShortestPathsQuery(clauses);
-
   MG_ASSERT(cypher_query, "Cypher grammar should not allow other queries in PROFILE");
   EvaluationContext evaluation_context;
   evaluation_context.timestamp = QueryTimestamp();
@@ -2306,14 +2204,14 @@ PreparedQuery PrepareProfileQuery(ParsedQuery parsed_query, bool in_explicit_tra
        // We want to execute the query we are profiling lazily, so we delay
        // the construction of the corresponding context.
        stats_and_total_time = std::optional<plan::ProfilingStatsWithTotalTime>{},
-       pull_plan = std::shared_ptr<PullPlanVector>(nullptr), transaction_status, use_monotonic_memory,
-       frame_change_collector, tx_timer = std::move(tx_timer)](
-          AnyStream *stream, std::optional<int> n) mutable -> std::optional<QueryHandlerResult> {
+       pull_plan = std::shared_ptr<PullPlanVector>(nullptr), transaction_status, frame_change_collector,
+       tx_timer = std::move(tx_timer)](AnyStream *stream,
+                                       std::optional<int> n) mutable -> std::optional<QueryHandlerResult> {
         // No output symbols are given so that nothing is streamed.
         if (!stats_and_total_time) {
           stats_and_total_time =
               PullPlan(plan, parameters, true, dba, interpreter_context, execution_memory, std::move(user_or_role),
-                       transaction_status, std::move(tx_timer), nullptr, memory_limit, use_monotonic_memory,
+                       transaction_status, std::move(tx_timer), nullptr, memory_limit,
                        frame_change_collector->IsTrackingValues() ? frame_change_collector : nullptr)
                   .Pull(stream, {}, {}, summary);
           pull_plan = std::make_shared<PullPlanVector>(ProfilingStatsToTable(*stats_and_total_time));
@@ -4276,6 +4174,7 @@ PreparedQuery PrepareShowDatabasesQuery(ParsedQuery parsed_query, InterpreterCon
 std::optional<uint64_t> Interpreter::GetTransactionId() const { return current_transaction_; }
 
 void Interpreter::BeginTransaction(QueryExtras const &extras) {
+  ResetInterpreter();
   const auto prepared_query = PrepareTransactionQuery("BEGIN", extras);
   prepared_query.query_handler(nullptr, {});
 }
@@ -4310,12 +4209,12 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
   const auto upper_case_query = utils::ToUpperCase(query_string);
   const auto trimmed_query = utils::Trim(upper_case_query);
   if (trimmed_query == "BEGIN" || trimmed_query == "COMMIT" || trimmed_query == "ROLLBACK") {
-    auto resource = utils::MonotonicBufferResource(kExecutionMemoryBlockSize);
-    auto prepared_query = PrepareTransactionQuery(trimmed_query, extras);
-    auto &query_execution =
-        query_executions_.emplace_back(QueryExecution::Create(std::move(resource), std::move(prepared_query)));
-    std::optional<int> qid =
-        in_explicit_transaction_ ? static_cast<int>(query_executions_.size() - 1) : std::optional<int>{};
+    if (trimmed_query == "BEGIN") {
+      ResetInterpreter();
+    }
+    auto &query_execution = query_executions_.emplace_back(QueryExecution::Create());
+    query_execution->prepared_query = PrepareTransactionQuery(trimmed_query, extras);
+    auto qid = in_explicit_transaction_ ? static_cast<int>(query_executions_.size() - 1) : std::optional<int>{};
     return {query_execution->prepared_query->header, query_execution->prepared_query->privileges, qid, {}};
   }
 
@@ -4345,35 +4244,8 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
         ParseQuery(query_string, params, &interpreter_context_->ast_cache, interpreter_context_->config.query);
     auto parsing_time = parsing_timer.Elapsed().count();
 
-    CypherQuery const *const cypher_query = [&]() -> CypherQuery * {
-      if (auto *cypher_query = utils::Downcast<CypherQuery>(parsed_query.query)) {
-        return cypher_query;
-      }
-      if (auto *profile_query = utils::Downcast<ProfileQuery>(parsed_query.query)) {
-        return profile_query->cypher_query_;
-      }
-      return nullptr;
-    }();  // IILE
-
-    auto const [usePool, hasAllShortestPaths] = [&]() -> std::pair<bool, bool> {
-      if (!cypher_query) {
-        return {false, false};
-      }
-      auto const &clauses = cypher_query->single_query_->clauses_;
-      bool hasAllShortestPaths = IsAllShortestPathsQuery(clauses);
-      // Using PoolResource without MonotonicMemoryResouce for LOAD CSV reduces memory usage.
-      bool usePool = hasAllShortestPaths || IsCallBatchedProcedureQuery(clauses) || IsLoadCsvQuery(clauses);
-      return {usePool, hasAllShortestPaths};
-    }();  // IILE
-
     // Setup QueryExecution
-    // its MemoryResource is mostly used for allocations done on Frame and storing `row`s
-    if (usePool) {
-      query_executions_.emplace_back(QueryExecution::Create(utils::PoolResource(128, kExecutionPoolMaxBlockSize)));
-    } else {
-      query_executions_.emplace_back(QueryExecution::Create(utils::MonotonicBufferResource(kExecutionMemoryBlockSize)));
-    }
-
+    query_executions_.emplace_back(QueryExecution::Create());
     auto &query_execution = query_executions_.back();
     query_execution_ptr = &query_execution;
 
@@ -4442,9 +4314,7 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
 
     utils::Timer planning_timer;
     PreparedQuery prepared_query;
-    utils::MemoryResource *memory_resource =
-        std::visit([](auto &execution_memory) -> utils::MemoryResource * { return &execution_memory; },
-                   query_execution->execution_memory);
+    utils::MemoryResource *memory_resource = query_execution->execution_memory.resource();
     frame_change_collector_.reset();
     frame_change_collector_.emplace();
     if (utils::Downcast<CypherQuery>(parsed_query.query)) {
@@ -4455,10 +4325,10 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
       prepared_query = PrepareExplainQuery(std::move(parsed_query), &query_execution->summary,
                                            &query_execution->notifications, interpreter_context_, current_db_);
     } else if (utils::Downcast<ProfileQuery>(parsed_query.query)) {
-      prepared_query = PrepareProfileQuery(std::move(parsed_query), in_explicit_transaction_, &query_execution->summary,
-                                           &query_execution->notifications, interpreter_context_, current_db_,
-                                           &query_execution->execution_memory_with_exception, user_or_role_,
-                                           &transaction_status_, current_timeout_timer_, &*frame_change_collector_);
+      prepared_query =
+          PrepareProfileQuery(std::move(parsed_query), in_explicit_transaction_, &query_execution->summary,
+                              &query_execution->notifications, interpreter_context_, current_db_, memory_resource,
+                              user_or_role_, &transaction_status_, current_timeout_timer_, &*frame_change_collector_);
     } else if (utils::Downcast<DumpQuery>(parsed_query.query)) {
       prepared_query = PrepareDumpQuery(std::move(parsed_query), current_db_);
     } else if (utils::Downcast<IndexQuery>(parsed_query.query)) {
@@ -4660,7 +4530,7 @@ void RunTriggersAfterCommit(dbms::DatabaseAccess db_acc, InterpreterContext *int
                             std::atomic<TransactionStatus> *transaction_status) {
   // Run the triggers
   for (const auto &trigger : db_acc->trigger_store()->AfterCommitTriggers().access()) {
-    utils::MonotonicBufferResource execution_memory{kExecutionMemoryBlockSize};
+    QueryAllocator execution_memory{};
 
     // create a new transaction for each trigger
     auto tx_acc = db_acc->Access();
@@ -4671,7 +4541,7 @@ void RunTriggersAfterCommit(dbms::DatabaseAccess db_acc, InterpreterContext *int
     auto trigger_context = original_trigger_context;
     trigger_context.AdaptForAccessor(&db_accessor);
     try {
-      trigger.Execute(&db_accessor, &execution_memory, flags::run_time::GetExecutionTimeout(),
+      trigger.Execute(&db_accessor, execution_memory.resource(), flags::run_time::GetExecutionTimeout(),
                       &interpreter_context->is_shutting_down, transaction_status, trigger_context);
     } catch (const utils::BasicException &exception) {
       spdlog::warn("Trigger '{}' failed with exception:\n{}", trigger.Name(), exception.what());
@@ -4825,11 +4695,12 @@ void Interpreter::Commit() {
   if (trigger_context) {
     // Run the triggers
     for (const auto &trigger : db->trigger_store()->BeforeCommitTriggers().access()) {
-      utils::MonotonicBufferResource execution_memory{kExecutionMemoryBlockSize};
+      QueryAllocator execution_memory{};
       AdvanceCommand();
       try {
-        trigger.Execute(&*current_db_.execution_db_accessor_, &execution_memory, flags::run_time::GetExecutionTimeout(),
-                        &interpreter_context_->is_shutting_down, &transaction_status_, *trigger_context);
+        trigger.Execute(&*current_db_.execution_db_accessor_, execution_memory.resource(),
+                        flags::run_time::GetExecutionTimeout(), &interpreter_context_->is_shutting_down,
+                        &transaction_status_, *trigger_context);
       } catch (const utils::BasicException &e) {
         throw utils::BasicException(
             fmt::format("Trigger '{}' caused the transaction to fail.\nException: {}", trigger.Name(), e.what()));
diff --git a/src/query/interpreter.hpp b/src/query/interpreter.hpp
index f18bd6721..5366b4472 100644
--- a/src/query/interpreter.hpp
+++ b/src/query/interpreter.hpp
@@ -65,6 +65,54 @@ extern const Event SuccessfulQuery;
 
 namespace memgraph::query {
 
+struct QueryAllocator {
+  QueryAllocator() = default;
+  QueryAllocator(QueryAllocator const &) = delete;
+  QueryAllocator &operator=(QueryAllocator const &) = delete;
+
+  // No move addresses to pool & monotonic fields must be stable
+  QueryAllocator(QueryAllocator &&) = delete;
+  QueryAllocator &operator=(QueryAllocator &&) = delete;
+
+  auto resource() -> utils::MemoryResource * {
+#ifndef MG_MEMORY_PROFILE
+    return &pool;
+#else
+    return upstream_resource();
+#endif
+  }
+  auto resource_without_pool() -> utils::MemoryResource * {
+#ifndef MG_MEMORY_PROFILE
+    return &monotonic;
+#else
+    return upstream_resource();
+#endif
+  }
+  auto resource_without_pool_or_mono() -> utils::MemoryResource * { return upstream_resource(); }
+
+ private:
+  // At least one page to ensure not sharing page with other subsystems
+  static constexpr auto kMonotonicInitialSize = 4UL * 1024UL;
+  // TODO: need to profile to check for good defaults, also maybe PoolResource
+  //  needs to be smarter. We expect more reuse of smaller objects than larger
+  //  objects. 64*1024B is maybe wasteful, whereas 256*32B maybe sensible.
+  //  Depends on number of small objects expected.
+  static constexpr auto kPoolBlockPerChunk = 64UL;
+  static constexpr auto kPoolMaxBlockSize = 1024UL;
+
+  static auto upstream_resource() -> utils::MemoryResource * {
+    // singleton ResourceWithOutOfMemoryException
+    // explicitly backed by NewDeleteResource
+    static auto upstream = utils::ResourceWithOutOfMemoryException{utils::NewDeleteResource()};
+    return &upstream;
+  }
+
+#ifndef MG_MEMORY_PROFILE
+  memgraph::utils::MonotonicBufferResource monotonic{kMonotonicInitialSize, upstream_resource()};
+  memgraph::utils::PoolResource pool{kPoolBlockPerChunk, &monotonic, upstream_resource()};
+#endif
+};
+
 struct InterpreterContext;
 
 inline constexpr size_t kExecutionMemoryBlockSize = 1UL * 1024UL * 1024UL;
@@ -304,45 +352,25 @@ class Interpreter final {
   }
 
   struct QueryExecution {
-    std::variant<utils::MonotonicBufferResource, utils::PoolResource> execution_memory;
-    utils::ResourceWithOutOfMemoryException execution_memory_with_exception;
-    std::optional<PreparedQuery> prepared_query;
+    QueryAllocator execution_memory;  // NOTE: before all other fields which uses this memory
 
+    std::optional<PreparedQuery> prepared_query;
     std::map<std::string, TypedValue> summary;
     std::vector<Notification> notifications;
 
-    static auto Create(std::variant<utils::MonotonicBufferResource, utils::PoolResource> memory_resource,
-                       std::optional<PreparedQuery> prepared_query = std::nullopt) -> std::unique_ptr<QueryExecution> {
-      return std::make_unique<QueryExecution>(std::move(memory_resource), std::move(prepared_query));
-    }
+    static auto Create() -> std::unique_ptr<QueryExecution> { return std::make_unique<QueryExecution>(); }
 
-    explicit QueryExecution(std::variant<utils::MonotonicBufferResource, utils::PoolResource> memory_resource,
-                            std::optional<PreparedQuery> prepared_query)
-        : execution_memory(std::move(memory_resource)), prepared_query{std::move(prepared_query)} {
-      std::visit(
-          [&](auto &memory_resource) {
-            execution_memory_with_exception = utils::ResourceWithOutOfMemoryException(&memory_resource);
-          },
-          execution_memory);
-    };
+    explicit QueryExecution() = default;
 
     QueryExecution(const QueryExecution &) = delete;
-    QueryExecution(QueryExecution &&) = default;
+    QueryExecution(QueryExecution &&) = delete;
     QueryExecution &operator=(const QueryExecution &) = delete;
-    QueryExecution &operator=(QueryExecution &&) = default;
+    QueryExecution &operator=(QueryExecution &&) = delete;
 
-    ~QueryExecution() {
-      // We should always release the execution memory AFTER we
-      // destroy the prepared query which is using that instance
-      // of execution memory.
-      prepared_query.reset();
-      std::visit([](auto &memory_resource) { memory_resource.Release(); }, execution_memory);
-    }
+    ~QueryExecution() = default;
 
     void CleanRuntimeData() {
-      if (prepared_query.has_value()) {
-        prepared_query.reset();
-      }
+      prepared_query.reset();
       notifications.clear();
     }
   };
@@ -413,9 +441,7 @@ std::map<std::string, TypedValue> Interpreter::Pull(TStream *result_stream, std:
   try {
     // Wrap the (statically polymorphic) stream type into a common type which
     // the handler knows.
-    AnyStream stream{result_stream,
-                     std::visit([](auto &execution_memory) -> utils::MemoryResource * { return &execution_memory; },
-                                query_execution->execution_memory)};
+    AnyStream stream{result_stream, query_execution->execution_memory.resource()};
     const auto maybe_res = query_execution->prepared_query->query_handler(&stream, n);
     // Stream is using execution memory of the query_execution which
     // can be deleted after its execution so the stream should be cleared
diff --git a/src/query/plan/operator.cpp b/src/query/plan/operator.cpp
index 29f64f950..8e1b9f529 100644
--- a/src/query/plan/operator.cpp
+++ b/src/query/plan/operator.cpp
@@ -69,6 +69,7 @@
 #include "utils/pmr/vector.hpp"
 #include "utils/readable_size.hpp"
 #include "utils/string.hpp"
+#include "utils/tag.hpp"
 #include "utils/temporal.hpp"
 #include "utils/typeinfo.hpp"
 
@@ -864,17 +865,15 @@ bool Expand::ExpandCursor::Pull(Frame &frame, ExecutionContext &context) {
   SCOPED_PROFILE_OP_BY_REF(self_);
 
   // A helper function for expanding a node from an edge.
-  auto pull_node = [this, &frame](const EdgeAccessor &new_edge, EdgeAtom::Direction direction) {
+  auto pull_node = [this, &frame]<EdgeAtom::Direction direction>(const EdgeAccessor &new_edge,
+                                                                 utils::tag_value<direction>) {
     if (self_.common_.existing_node) return;
-    switch (direction) {
-      case EdgeAtom::Direction::IN:
-        frame[self_.common_.node_symbol] = new_edge.From();
-        break;
-      case EdgeAtom::Direction::OUT:
-        frame[self_.common_.node_symbol] = new_edge.To();
-        break;
-      case EdgeAtom::Direction::BOTH:
-        LOG_FATAL("Must indicate exact expansion direction here");
+    if constexpr (direction == EdgeAtom::Direction::IN) {
+      frame[self_.common_.node_symbol] = new_edge.From();
+    } else if constexpr (direction == EdgeAtom::Direction::OUT) {
+      frame[self_.common_.node_symbol] = new_edge.To();
+    } else {
+      LOG_FATAL("Must indicate exact expansion direction here");
     }
   };
 
@@ -893,7 +892,7 @@ bool Expand::ExpandCursor::Pull(Frame &frame, ExecutionContext &context) {
 #endif
 
       frame[self_.common_.edge_symbol] = edge;
-      pull_node(edge, EdgeAtom::Direction::IN);
+      pull_node(edge, utils::tag_v<EdgeAtom::Direction::IN>);
       return true;
     }
 
@@ -913,7 +912,7 @@ bool Expand::ExpandCursor::Pull(Frame &frame, ExecutionContext &context) {
       }
 #endif
       frame[self_.common_.edge_symbol] = edge;
-      pull_node(edge, EdgeAtom::Direction::OUT);
+      pull_node(edge, utils::tag_v<EdgeAtom::Direction::OUT>);
       return true;
     }
 
@@ -1007,12 +1006,12 @@ bool Expand::ExpandCursor::InitEdges(Frame &frame, ExecutionContext &context) {
           auto existing_node = *expansion_info_.existing_node;
 
           auto edges_result = UnwrapEdgesResult(vertex.InEdges(self_.view_, self_.common_.edge_types, existing_node));
-          in_edges_.emplace(edges_result.edges);
+          in_edges_.emplace(std::move(edges_result.edges));
           num_expanded_first = edges_result.expanded_count;
         }
       } else {
         auto edges_result = UnwrapEdgesResult(vertex.InEdges(self_.view_, self_.common_.edge_types));
-        in_edges_.emplace(edges_result.edges);
+        in_edges_.emplace(std::move(edges_result.edges));
         num_expanded_first = edges_result.expanded_count;
       }
       if (in_edges_) {
@@ -1026,12 +1025,12 @@ bool Expand::ExpandCursor::InitEdges(Frame &frame, ExecutionContext &context) {
         if (expansion_info_.existing_node) {
           auto existing_node = *expansion_info_.existing_node;
           auto edges_result = UnwrapEdgesResult(vertex.OutEdges(self_.view_, self_.common_.edge_types, existing_node));
-          out_edges_.emplace(edges_result.edges);
+          out_edges_.emplace(std::move(edges_result.edges));
           num_expanded_second = edges_result.expanded_count;
         }
       } else {
         auto edges_result = UnwrapEdgesResult(vertex.OutEdges(self_.view_, self_.common_.edge_types));
-        out_edges_.emplace(edges_result.edges);
+        out_edges_.emplace(std::move(edges_result.edges));
         num_expanded_second = edges_result.expanded_count;
       }
       if (out_edges_) {
@@ -1117,14 +1116,14 @@ auto ExpandFromVertex(const VertexAccessor &vertex, EdgeAtom::Direction directio
 
   if (direction != EdgeAtom::Direction::OUT) {
     auto edges = UnwrapEdgesResult(vertex.InEdges(view, edge_types)).edges;
-    if (edges.begin() != edges.end()) {
+    if (!edges.empty()) {
       chain_elements.emplace_back(wrapper(EdgeAtom::Direction::IN, std::move(edges)));
     }
   }
 
   if (direction != EdgeAtom::Direction::IN) {
     auto edges = UnwrapEdgesResult(vertex.OutEdges(view, edge_types)).edges;
-    if (edges.begin() != edges.end()) {
+    if (!edges.empty()) {
       chain_elements.emplace_back(wrapper(EdgeAtom::Direction::OUT, std::move(edges)));
     }
   }
@@ -1244,8 +1243,13 @@ class ExpandVariableCursor : public Cursor {
       }
 
       // reset the frame value to an empty edge list
-      auto *pull_memory = context.evaluation_context.memory;
-      frame[self_.common_.edge_symbol] = TypedValue::TVector(pull_memory);
+      if (frame[self_.common_.edge_symbol].IsList()) {
+        // Preserve the list capacity if possible
+        frame[self_.common_.edge_symbol].ValueList().clear();
+      } else {
+        auto *pull_memory = context.evaluation_context.memory;
+        frame[self_.common_.edge_symbol] = TypedValue::TVector(pull_memory);
+      }
 
       return true;
     }
@@ -4474,9 +4478,8 @@ class UnwindCursor : public Cursor {
         TypedValue input_value = self_.input_expression_->Accept(evaluator);
         if (input_value.type() != TypedValue::Type::List)
           throw QueryRuntimeException("Argument of UNWIND must be a list, but '{}' was provided.", input_value.type());
-        // Copy the evaluted input_value_list to our vector.
-        // eval memory != query memory
-        input_value_ = input_value.ValueList();
+        // Move the evaluted input_value_list to our vector.
+        input_value_ = std::move(input_value.ValueList());
         input_value_it_ = input_value_.begin();
       }
 
@@ -5336,6 +5339,7 @@ class LoadCsvCursor : public Cursor {
             "1");
       }
       did_pull_ = true;
+      reader_->Reset();
     }
 
     auto row = reader_->GetNextRow(context.evaluation_context.memory);
diff --git a/src/query/plan/operator.hpp b/src/query/plan/operator.hpp
index 5a8ef0625..e502fbadd 100644
--- a/src/query/plan/operator.hpp
+++ b/src/query/plan/operator.hpp
@@ -76,18 +76,13 @@ using UniqueCursorPtr = std::unique_ptr<Cursor, std::function<void(Cursor *)>>;
 template <class TCursor, class... TArgs>
 std::unique_ptr<Cursor, std::function<void(Cursor *)>> MakeUniqueCursorPtr(utils::Allocator<TCursor> allocator,
                                                                            TArgs &&...args) {
-  auto *ptr = allocator.allocate(1);
-  try {
-    auto *cursor = new (ptr) TCursor(std::forward<TArgs>(args)...);
-    return std::unique_ptr<Cursor, std::function<void(Cursor *)>>(cursor, [allocator](Cursor *base_ptr) mutable {
-      auto *p = static_cast<TCursor *>(base_ptr);
-      p->~TCursor();
-      allocator.deallocate(p, 1);
-    });
-  } catch (...) {
-    allocator.deallocate(ptr, 1);
-    throw;
-  }
+  auto *cursor = allocator.template new_object<TCursor>(std::forward<TArgs>(args)...);
+  auto dtr = [allocator](Cursor *base_ptr) mutable {
+    auto *p = static_cast<TCursor *>(base_ptr);
+    allocator.delete_object(p);
+  };
+  // TODO: not std::function
+  return std::unique_ptr<Cursor, std::function<void(Cursor *)>>(cursor, std::move(dtr));
 }
 
 class Once;
diff --git a/src/query/trigger.cpp b/src/query/trigger.cpp
index 437389128..151a33dad 100644
--- a/src/query/trigger.cpp
+++ b/src/query/trigger.cpp
@@ -191,9 +191,9 @@ std::shared_ptr<Trigger::TriggerPlan> Trigger::GetPlan(DbAccessor *db_accessor)
   return trigger_plan_;
 }
 
-void Trigger::Execute(DbAccessor *dba, utils::MonotonicBufferResource *execution_memory,
-                      const double max_execution_time_sec, std::atomic<bool> *is_shutting_down,
-                      std::atomic<TransactionStatus> *transaction_status, const TriggerContext &context) const {
+void Trigger::Execute(DbAccessor *dba, utils::MemoryResource *execution_memory, const double max_execution_time_sec,
+                      std::atomic<bool> *is_shutting_down, std::atomic<TransactionStatus> *transaction_status,
+                      const TriggerContext &context) const {
   if (!context.ShouldEventTrigger(event_type_)) {
     return;
   }
@@ -214,22 +214,7 @@ void Trigger::Execute(DbAccessor *dba, utils::MonotonicBufferResource *execution
   ctx.is_shutting_down = is_shutting_down;
   ctx.transaction_status = transaction_status;
   ctx.is_profile_query = false;
-
-  // Set up temporary memory for a single Pull. Initial memory comes from the
-  // stack. 256 KiB should fit on the stack and should be more than enough for a
-  // single `Pull`.
-  static constexpr size_t stack_size = 256UL * 1024UL;
-  char stack_data[stack_size];
-
-  // We can throw on every query because a simple queries for deleting will use only
-  // the stack allocated buffer.
-  // Also, we want to throw only when the query engine requests more memory and not the storage
-  // so we add the exception to the allocator.
-  utils::ResourceWithOutOfMemoryException resource_with_exception;
-  utils::MonotonicBufferResource monotonic_memory(&stack_data[0], stack_size, &resource_with_exception);
-  // TODO (mferencevic): Tune the parameters accordingly.
-  utils::PoolResource pool_memory(128, 1024, &monotonic_memory);
-  ctx.evaluation_context.memory = &pool_memory;
+  ctx.evaluation_context.memory = execution_memory;
 
   auto cursor = plan.plan().MakeCursor(execution_memory);
   Frame frame{plan.symbol_table().max_position(), execution_memory};
diff --git a/src/query/trigger.hpp b/src/query/trigger.hpp
index 91c74579e..24bbf50ee 100644
--- a/src/query/trigger.hpp
+++ b/src/query/trigger.hpp
@@ -39,7 +39,7 @@ struct Trigger {
                    utils::SkipList<QueryCacheEntry> *query_cache, DbAccessor *db_accessor,
                    const InterpreterConfig::Query &query_config, std::shared_ptr<QueryUserOrRole> owner);
 
-  void Execute(DbAccessor *dba, utils::MonotonicBufferResource *execution_memory, double max_execution_time_sec,
+  void Execute(DbAccessor *dba, utils::MemoryResource *execution_memory, double max_execution_time_sec,
                std::atomic<bool> *is_shutting_down, std::atomic<TransactionStatus> *transaction_status,
                const TriggerContext &context) const;
 
diff --git a/src/storage/v2/property_value.hpp b/src/storage/v2/property_value.hpp
index e48be008a..161ad151a 100644
--- a/src/storage/v2/property_value.hpp
+++ b/src/storage/v2/property_value.hpp
@@ -92,7 +92,28 @@ class PropertyValue {
   // TODO: Implement copy assignment operators for primitive types.
   // TODO: Implement copy and move assignment operators for non-primitive types.
 
-  ~PropertyValue() { DestroyValue(); }
+  ~PropertyValue() {
+    switch (type_) {
+      // destructor for primitive types does nothing
+      case Type::Null:
+      case Type::Bool:
+      case Type::Int:
+      case Type::Double:
+      case Type::TemporalData:
+        return;
+
+      // destructor for non primitive types since we used placement new
+      case Type::String:
+        std::destroy_at(&string_v.val_);
+        return;
+      case Type::List:
+        std::destroy_at(&list_v.val_);
+        return;
+      case Type::Map:
+        std::destroy_at(&map_v.val_);
+        return;
+    }
+  }
 
   Type type() const { return type_; }
 
@@ -189,8 +210,6 @@ class PropertyValue {
   }
 
  private:
-  void DestroyValue() noexcept;
-
   // NOTE: this may look strange but it is for better data layout
   //       https://eel.is/c++draft/class.union#general-note-1
   union {
@@ -357,13 +376,13 @@ inline PropertyValue::PropertyValue(const PropertyValue &other) : type_(other.ty
       this->double_v.val_ = other.double_v.val_;
       return;
     case Type::String:
-      new (&string_v.val_) std::string(other.string_v.val_);
+      std::construct_at(&string_v.val_, other.string_v.val_);
       return;
     case Type::List:
-      new (&list_v.val_) std::vector<PropertyValue>(other.list_v.val_);
+      std::construct_at(&list_v.val_, other.list_v.val_);
       return;
     case Type::Map:
-      new (&map_v.val_) std::map<std::string, PropertyValue>(other.map_v.val_);
+      std::construct_at(&map_v.val_, other.map_v.val_);
       return;
     case Type::TemporalData:
       this->temporal_data_v.val_ = other.temporal_data_v.val_;
@@ -371,7 +390,7 @@ inline PropertyValue::PropertyValue(const PropertyValue &other) : type_(other.ty
   }
 }
 
-inline PropertyValue::PropertyValue(PropertyValue &&other) noexcept : type_(std::exchange(other.type_, Type::Null)) {
+inline PropertyValue::PropertyValue(PropertyValue &&other) noexcept : type_(other.type_) {
   switch (type_) {
     case Type::Null:
       break;
@@ -386,15 +405,12 @@ inline PropertyValue::PropertyValue(PropertyValue &&other) noexcept : type_(std:
       break;
     case Type::String:
       std::construct_at(&string_v.val_, std::move(other.string_v.val_));
-      std::destroy_at(&other.string_v.val_);
       break;
     case Type::List:
       std::construct_at(&list_v.val_, std::move(other.list_v.val_));
-      std::destroy_at(&other.list_v.val_);
       break;
     case Type::Map:
       std::construct_at(&map_v.val_, std::move(other.map_v.val_));
-      std::destroy_at(&other.map_v.val_);
       break;
     case Type::TemporalData:
       temporal_data_v.val_ = other.temporal_data_v.val_;
@@ -403,38 +419,88 @@ inline PropertyValue::PropertyValue(PropertyValue &&other) noexcept : type_(std:
 }
 
 inline PropertyValue &PropertyValue::operator=(const PropertyValue &other) {
-  if (this == &other) return *this;
+  if (type_ == other.type_) {
+    if (this == &other) return *this;
+    switch (other.type_) {
+      case Type::Null:
+        break;
+      case Type::Bool:
+        bool_v.val_ = other.bool_v.val_;
+        break;
+      case Type::Int:
+        int_v.val_ = other.int_v.val_;
+        break;
+      case Type::Double:
+        double_v.val_ = other.double_v.val_;
+        break;
+      case Type::String:
+        string_v.val_ = other.string_v.val_;
+        break;
+      case Type::List:
+        list_v.val_ = other.list_v.val_;
+        break;
+      case Type::Map:
+        map_v.val_ = other.map_v.val_;
+        break;
+      case Type::TemporalData:
+        temporal_data_v.val_ = other.temporal_data_v.val_;
+        break;
+    }
+    return *this;
+  } else {
+    // destroy
+    switch (type_) {
+      case Type::Null:
+        break;
+      case Type::Bool:
+        break;
+      case Type::Int:
+        break;
+      case Type::Double:
+        break;
+      case Type::String:
+        std::destroy_at(&string_v.val_);
+        break;
+      case Type::List:
+        std::destroy_at(&list_v.val_);
+        break;
+      case Type::Map:
+        std::destroy_at(&map_v.val_);
+        break;
+      case Type::TemporalData:
+        break;
+    }
+    // construct
+    auto *new_this = std::launder(this);
+    switch (other.type_) {
+      case Type::Null:
+        break;
+      case Type::Bool:
+        new_this->bool_v.val_ = other.bool_v.val_;
+        break;
+      case Type::Int:
+        new_this->int_v.val_ = other.int_v.val_;
+        break;
+      case Type::Double:
+        new_this->double_v.val_ = other.double_v.val_;
+        break;
+      case Type::String:
+        std::construct_at(&new_this->string_v.val_, other.string_v.val_);
+        break;
+      case Type::List:
+        std::construct_at(&new_this->list_v.val_, other.list_v.val_);
+        break;
+      case Type::Map:
+        std::construct_at(&new_this->map_v.val_, other.map_v.val_);
+        break;
+      case Type::TemporalData:
+        new_this->temporal_data_v.val_ = other.temporal_data_v.val_;
+        break;
+    }
 
-  DestroyValue();
-  type_ = other.type_;
-
-  switch (other.type_) {
-    case Type::Null:
-      break;
-    case Type::Bool:
-      this->bool_v.val_ = other.bool_v.val_;
-      break;
-    case Type::Int:
-      this->int_v.val_ = other.int_v.val_;
-      break;
-    case Type::Double:
-      this->double_v.val_ = other.double_v.val_;
-      break;
-    case Type::String:
-      new (&string_v.val_) std::string(other.string_v.val_);
-      break;
-    case Type::List:
-      new (&list_v.val_) std::vector<PropertyValue>(other.list_v.val_);
-      break;
-    case Type::Map:
-      new (&map_v.val_) std::map<std::string, PropertyValue>(other.map_v.val_);
-      break;
-    case Type::TemporalData:
-      this->temporal_data_v.val_ = other.temporal_data_v.val_;
-      break;
+    new_this->type_ = other.type_;
+    return *new_this;
   }
-
-  return *this;
 }
 
 inline PropertyValue &PropertyValue::operator=(PropertyValue &&other) noexcept {
@@ -456,48 +522,71 @@ inline PropertyValue &PropertyValue::operator=(PropertyValue &&other) noexcept {
         break;
       case Type::String:
         string_v.val_ = std::move(other.string_v.val_);
-        std::destroy_at(&other.string_v.val_);
         break;
       case Type::List:
         list_v.val_ = std::move(other.list_v.val_);
-        std::destroy_at(&other.list_v.val_);
         break;
       case Type::Map:
         map_v.val_ = std::move(other.map_v.val_);
-        std::destroy_at(&other.map_v.val_);
         break;
       case Type::TemporalData:
         temporal_data_v.val_ = other.temporal_data_v.val_;
         break;
     }
-    other.type_ = Type::Null;
     return *this;
   } else {
-    std::destroy_at(this);
-    return *std::construct_at(std::launder(this), std::move(other));
-  }
-}
+    // destroy
+    switch (type_) {
+      case Type::Null:
+        break;
+      case Type::Bool:
+        break;
+      case Type::Int:
+        break;
+      case Type::Double:
+        break;
+      case Type::String:
+        std::destroy_at(&string_v.val_);
+        break;
+      case Type::List:
+        std::destroy_at(&list_v.val_);
+        break;
+      case Type::Map:
+        std::destroy_at(&map_v.val_);
+        break;
+      case Type::TemporalData:
+        break;
+    }
+    // construct (no need to destroy moved from type)
+    auto *new_this = std::launder(this);
+    switch (other.type_) {
+      case Type::Null:
+        break;
+      case Type::Bool:
+        new_this->bool_v.val_ = other.bool_v.val_;
+        break;
+      case Type::Int:
+        new_this->int_v.val_ = other.int_v.val_;
+        break;
+      case Type::Double:
+        new_this->double_v.val_ = other.double_v.val_;
+        break;
+      case Type::String:
+        std::construct_at(&new_this->string_v.val_, std::move(other.string_v.val_));
+        break;
+      case Type::List:
+        std::construct_at(&new_this->list_v.val_, std::move(other.list_v.val_));
+        break;
+      case Type::Map:
+        std::construct_at(&new_this->map_v.val_, std::move(other.map_v.val_));
+        break;
+      case Type::TemporalData:
+        new_this->temporal_data_v.val_ = other.temporal_data_v.val_;
+        break;
+    }
 
-inline void PropertyValue::DestroyValue() noexcept {
-  switch (std::exchange(type_, Type::Null)) {
-    // destructor for primitive types does nothing
-    case Type::Null:
-    case Type::Bool:
-    case Type::Int:
-    case Type::Double:
-    case Type::TemporalData:
-      return;
-
-    // destructor for non primitive types since we used placement new
-    case Type::String:
-      std::destroy_at(&string_v.val_);
-      return;
-    case Type::List:
-      std::destroy_at(&list_v.val_);
-      return;
-    case Type::Map:
-      std::destroy_at(&map_v.val_);
-      return;
+    new_this->type_ = other.type_;
+    return *new_this;
   }
 }
 
diff --git a/src/utils/memory.cpp b/src/utils/memory.cpp
index d09f70fc3..6b1f26c11 100644
--- a/src/utils/memory.cpp
+++ b/src/utils/memory.cpp
@@ -1,4 +1,4 @@
-// Copyright 2023 Memgraph Ltd.
+// Copyright 2024 Memgraph Ltd.
 //
 // Use of this software is governed by the Business Source License
 // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@@ -150,128 +150,133 @@ void *MonotonicBufferResource::DoAllocate(size_t bytes, size_t alignment) {
 
 namespace impl {
 
-Pool::Pool(size_t block_size, unsigned char blocks_per_chunk, MemoryResource *memory)
-    : blocks_per_chunk_(blocks_per_chunk), block_size_(block_size), chunks_(memory) {}
-
-Pool::~Pool() { MG_ASSERT(chunks_.empty(), "You need to call Release before destruction!"); }
-
-void *Pool::Allocate() {
-  auto allocate_block_from_chunk = [this](Chunk *chunk) {
-    unsigned char *available_block = chunk->data + (chunk->first_available_block_ix * block_size_);
-    // Update free-list pointer (index in our case) by reading "next" from the
-    // available_block.
-    chunk->first_available_block_ix = *available_block;
-    --chunk->blocks_available;
-    return available_block;
-  };
-  if (last_alloc_chunk_ && last_alloc_chunk_->blocks_available > 0U)
-    return allocate_block_from_chunk(last_alloc_chunk_);
-  // Find a Chunk with available memory.
-  for (auto &chunk : chunks_) {
-    if (chunk.blocks_available > 0U) {
-      last_alloc_chunk_ = &chunk;
-      return allocate_block_from_chunk(last_alloc_chunk_);
-    }
-  }
-  // We haven't found a Chunk with available memory, so allocate a new one.
-  if (block_size_ > std::numeric_limits<size_t>::max() / blocks_per_chunk_) throw BadAlloc("Allocation size overflow");
-  size_t data_size = blocks_per_chunk_ * block_size_;
+Pool::Pool(size_t block_size, unsigned char blocks_per_chunk, MemoryResource *chunk_memory)
+    : blocks_per_chunk_(blocks_per_chunk), block_size_(block_size), chunks_(chunk_memory) {
   // Use the next pow2 of block_size_ as alignment, so that we cover alignment
   // requests between 1 and block_size_. Users of this class should make sure
   // that requested alignment of particular blocks is never greater than the
   // block itself.
-  size_t alignment = Ceil2(block_size_);
-  if (alignment < block_size_) throw BadAlloc("Allocation alignment overflow");
-  auto *data = reinterpret_cast<unsigned char *>(GetUpstreamResource()->Allocate(data_size, alignment));
-  // Form a free-list of blocks in data.
-  for (unsigned char i = 0U; i < blocks_per_chunk_; ++i) {
-    *(data + (i * block_size_)) = i + 1U;
-  }
-  Chunk chunk{data, 0, blocks_per_chunk_};
-  // Insert the big block in the sorted position.
-  auto it = std::lower_bound(chunks_.begin(), chunks_.end(), chunk,
-                             [](const auto &a, const auto &b) { return a.data < b.data; });
-  try {
-    it = chunks_.insert(it, chunk);
-  } catch (...) {
-    GetUpstreamResource()->Deallocate(data, data_size, alignment);
-    throw;
-  }
+  if (block_size_ > std::numeric_limits<size_t>::max() / blocks_per_chunk_) throw BadAlloc("Allocation size overflow");
+}
 
-  last_alloc_chunk_ = &*it;
-  last_dealloc_chunk_ = &*it;
-  return allocate_block_from_chunk(last_alloc_chunk_);
+Pool::~Pool() {
+  if (!chunks_.empty()) {
+    auto *resource = GetUpstreamResource();
+    auto const dataSize = blocks_per_chunk_ * block_size_;
+    auto const alignment = Ceil2(block_size_);
+    for (auto &chunk : chunks_) {
+      resource->Deallocate(chunk.raw_data, dataSize, alignment);
+    }
+    chunks_.clear();
+  }
+  free_list_ = nullptr;
+}
+
+void *Pool::Allocate() {
+  if (!free_list_) [[unlikely]] {
+    // need new chunk
+    auto const data_size = blocks_per_chunk_ * block_size_;
+    auto const alignment = Ceil2(block_size_);
+    auto *resource = GetUpstreamResource();
+    auto *data = reinterpret_cast<std::byte *>(resource->Allocate(data_size, alignment));
+    try {
+      auto &new_chunk = chunks_.emplace_front(data);
+      free_list_ = new_chunk.build_freelist(block_size_, blocks_per_chunk_);
+    } catch (...) {
+      resource->Deallocate(data, data_size, alignment);
+      throw;
+    }
+  }
+  return std::exchange(free_list_, *reinterpret_cast<std::byte **>(free_list_));
 }
 
 void Pool::Deallocate(void *p) {
-  MG_ASSERT(last_dealloc_chunk_, "No chunk to deallocate");
-  MG_ASSERT(!chunks_.empty(),
-            "Expected a call to Deallocate after at least a "
-            "single Allocate has been done.");
-  auto is_in_chunk = [this, p](const Chunk &chunk) {
-    auto ptr = reinterpret_cast<uintptr_t>(p);
-    size_t data_size = blocks_per_chunk_ * block_size_;
-    return reinterpret_cast<uintptr_t>(chunk.data) <= ptr && ptr < reinterpret_cast<uintptr_t>(chunk.data + data_size);
-  };
-  auto deallocate_block_from_chunk = [this, p](Chunk *chunk) {
-    // NOTE: This check is not enough to cover all double-free issues.
-    MG_ASSERT(chunk->blocks_available < blocks_per_chunk_,
-              "Deallocating more blocks than a chunk can contain, possibly a "
-              "double-free situation or we have a bug in the allocator.");
-    // Link the block into the free-list
-    auto *block = reinterpret_cast<unsigned char *>(p);
-    *block = chunk->first_available_block_ix;
-    chunk->first_available_block_ix = (block - chunk->data) / block_size_;
-    chunk->blocks_available++;
-  };
-  if (is_in_chunk(*last_dealloc_chunk_)) {
-    deallocate_block_from_chunk(last_dealloc_chunk_);
-    return;
-  }
-
-  // Find the chunk which served this allocation
-  Chunk chunk{reinterpret_cast<unsigned char *>(p) - blocks_per_chunk_ * block_size_, 0, 0};
-  auto it = std::lower_bound(chunks_.begin(), chunks_.end(), chunk,
-                             [](const auto &a, const auto &b) { return a.data <= b.data; });
-  MG_ASSERT(it != chunks_.end(), "Failed deallocation in utils::Pool");
-  MG_ASSERT(is_in_chunk(*it), "Failed deallocation in utils::Pool");
-
-  // Update last_alloc_chunk_ as well because it now has a free block.
-  // Additionally this corresponds with C++ pattern of allocations and
-  // deallocations being done in reverse order.
-  last_alloc_chunk_ = &*it;
-  last_dealloc_chunk_ = &*it;
-  deallocate_block_from_chunk(last_dealloc_chunk_);
-  // TODO: We could release the Chunk to upstream memory
-}
-
-void Pool::Release() {
-  for (auto &chunk : chunks_) {
-    size_t data_size = blocks_per_chunk_ * block_size_;
-    size_t alignment = Ceil2(block_size_);
-    GetUpstreamResource()->Deallocate(chunk.data, data_size, alignment);
-  }
-  chunks_.clear();
-  last_alloc_chunk_ = nullptr;
-  last_dealloc_chunk_ = nullptr;
+  *reinterpret_cast<std::byte **>(p) = std::exchange(free_list_, reinterpret_cast<std::byte *>(p));
 }
 
 }  // namespace impl
 
-PoolResource::PoolResource(size_t max_blocks_per_chunk, size_t max_block_size, MemoryResource *memory_pools,
-                           MemoryResource *memory_unpooled)
-    : pools_(memory_pools),
-      unpooled_(memory_unpooled),
-      max_blocks_per_chunk_(std::min(max_blocks_per_chunk, static_cast<size_t>(impl::Pool::MaxBlocksInChunk()))),
-      max_block_size_(max_block_size) {
-  MG_ASSERT(max_blocks_per_chunk_ > 0U, "Invalid number of blocks per chunk");
-  MG_ASSERT(max_block_size_ > 0U, "Invalid size of block");
+struct NullMemoryResourceImpl final : public MemoryResource {
+  NullMemoryResourceImpl() = default;
+  NullMemoryResourceImpl(NullMemoryResourceImpl const &) = default;
+  NullMemoryResourceImpl &operator=(NullMemoryResourceImpl const &) = default;
+  NullMemoryResourceImpl(NullMemoryResourceImpl &&) = default;
+  NullMemoryResourceImpl &operator=(NullMemoryResourceImpl &&) = default;
+  ~NullMemoryResourceImpl() override = default;
+
+ private:
+  void *DoAllocate(size_t /*bytes*/, size_t /*alignment*/) override {
+    throw BadAlloc{"NullMemoryResource doesn't allocate"};
+  }
+  void DoDeallocate(void * /*p*/, size_t /*bytes*/, size_t /*alignment*/) override {
+    throw BadAlloc{"NullMemoryResource doesn't deallocate"};
+  }
+  bool DoIsEqual(MemoryResource const &other) const noexcept override {
+    return dynamic_cast<NullMemoryResourceImpl const *>(&other) != nullptr;
+  }
+};
+
+MemoryResource *NullMemoryResource() noexcept {
+  static auto res = NullMemoryResourceImpl{};
+  return &res;
 }
 
+namespace impl {
+
+/// 1 bit sensitivity test
+static_assert(bin_index<1>(9U) == 0);
+static_assert(bin_index<1>(10U) == 0);
+static_assert(bin_index<1>(11U) == 0);
+static_assert(bin_index<1>(12U) == 0);
+static_assert(bin_index<1>(13U) == 0);
+static_assert(bin_index<1>(14U) == 0);
+static_assert(bin_index<1>(15U) == 0);
+static_assert(bin_index<1>(16U) == 0);
+
+static_assert(bin_index<1>(17U) == 1);
+static_assert(bin_index<1>(18U) == 1);
+static_assert(bin_index<1>(19U) == 1);
+static_assert(bin_index<1>(20U) == 1);
+static_assert(bin_index<1>(21U) == 1);
+static_assert(bin_index<1>(22U) == 1);
+static_assert(bin_index<1>(23U) == 1);
+static_assert(bin_index<1>(24U) == 1);
+static_assert(bin_index<1>(25U) == 1);
+static_assert(bin_index<1>(26U) == 1);
+static_assert(bin_index<1>(27U) == 1);
+static_assert(bin_index<1>(28U) == 1);
+static_assert(bin_index<1>(29U) == 1);
+static_assert(bin_index<1>(30U) == 1);
+static_assert(bin_index<1>(31U) == 1);
+static_assert(bin_index<1>(32U) == 1);
+
+/// 2 bit sensitivity test
+
+static_assert(bin_index<2>(9U) == 0);
+static_assert(bin_index<2>(10U) == 0);
+static_assert(bin_index<2>(11U) == 0);
+static_assert(bin_index<2>(12U) == 0);
+
+static_assert(bin_index<2>(13U) == 1);
+static_assert(bin_index<2>(14U) == 1);
+static_assert(bin_index<2>(15U) == 1);
+static_assert(bin_index<2>(16U) == 1);
+
+static_assert(bin_index<2>(17U) == 2);
+static_assert(bin_index<2>(18U) == 2);
+static_assert(bin_index<2>(19U) == 2);
+static_assert(bin_index<2>(20U) == 2);
+static_assert(bin_index<2>(21U) == 2);
+static_assert(bin_index<2>(22U) == 2);
+static_assert(bin_index<2>(23U) == 2);
+static_assert(bin_index<2>(24U) == 2);
+
+}  // namespace impl
+
 void *PoolResource::DoAllocate(size_t bytes, size_t alignment) {
   // Take the max of `bytes` and `alignment` so that we simplify handling
   // alignment requests.
-  size_t block_size = std::max(bytes, alignment);
+  size_t block_size = std::max({bytes, alignment, 1UL});
   // Check that we have received a regular allocation request with non-padded
   // structs/classes in play. These will always have
   // `sizeof(T) % alignof(T) == 0`. Special requests which don't have that
@@ -279,80 +284,36 @@ void *PoolResource::DoAllocate(size_t bytes, size_t alignment) {
   // have to write a general-purpose allocator which has to behave as complex
   // as malloc/free.
   if (block_size % alignment != 0) throw BadAlloc("Requested bytes must be a multiple of alignment");
-  if (block_size > max_block_size_) {
-    // Allocate a big block.
-    BigBlock big_block{bytes, alignment, GetUpstreamResourceBlocks()->Allocate(bytes, alignment)};
-    // Insert the big block in the sorted position.
-    auto it = std::lower_bound(unpooled_.begin(), unpooled_.end(), big_block,
-                               [](const auto &a, const auto &b) { return a.data < b.data; });
-    try {
-      unpooled_.insert(it, big_block);
-    } catch (...) {
-      GetUpstreamResourceBlocks()->Deallocate(big_block.data, bytes, alignment);
-      throw;
-    }
-    return big_block.data;
-  }
-  // Allocate a regular block, first check if last_alloc_pool_ is suitable.
-  if (last_alloc_pool_ && last_alloc_pool_->GetBlockSize() == block_size) {
-    return last_alloc_pool_->Allocate();
-  }
-  // Find the pool with greater or equal block_size.
-  impl::Pool pool(block_size, max_blocks_per_chunk_, GetUpstreamResource());
-  auto it = std::lower_bound(pools_.begin(), pools_.end(), pool,
-                             [](const auto &a, const auto &b) { return a.GetBlockSize() < b.GetBlockSize(); });
-  if (it != pools_.end() && it->GetBlockSize() == block_size) {
-    last_alloc_pool_ = &*it;
-    last_dealloc_pool_ = &*it;
-    return it->Allocate();
-  }
-  // We don't have a pool for this block_size, so insert it in the sorted
-  // position.
-  it = pools_.emplace(it, std::move(pool));
-  last_alloc_pool_ = &*it;
-  last_dealloc_pool_ = &*it;
-  return it->Allocate();
-}
 
+  if (block_size <= 64) {
+    return mini_pools_[(block_size - 1UL) / 8UL].Allocate();
+  }
+  if (block_size <= 128) {
+    return pools_3bit_.allocate(block_size);
+  }
+  if (block_size <= 512) {
+    return pools_4bit_.allocate(block_size);
+  }
+  if (block_size <= 1024) {
+    return pools_5bit_.allocate(block_size);
+  }
+  return unpooled_memory_->Allocate(bytes, alignment);
+}
 void PoolResource::DoDeallocate(void *p, size_t bytes, size_t alignment) {
-  size_t block_size = std::max(bytes, alignment);
-  MG_ASSERT(block_size % alignment == 0,
-            "PoolResource shouldn't serve allocation requests where bytes aren't "
-            "a multiple of alignment");
-  if (block_size > max_block_size_) {
-    // Deallocate a big block.
-    BigBlock big_block{bytes, alignment, p};
-    auto it = std::lower_bound(unpooled_.begin(), unpooled_.end(), big_block,
-                               [](const auto &a, const auto &b) { return a.data < b.data; });
-    MG_ASSERT(it != unpooled_.end(), "Failed deallocation");
-    MG_ASSERT(it->data == p && it->bytes == bytes && it->alignment == alignment, "Failed deallocation");
-    unpooled_.erase(it);
-    GetUpstreamResourceBlocks()->Deallocate(p, bytes, alignment);
-    return;
+  size_t block_size = std::max({bytes, alignment, 1UL});
+  DMG_ASSERT(block_size % alignment == 0);
+
+  if (block_size <= 64) {
+    mini_pools_[(block_size - 1UL) / 8UL].Deallocate(p);
+  } else if (block_size <= 128) {
+    pools_3bit_.deallocate(p, block_size);
+  } else if (block_size <= 512) {
+    pools_4bit_.deallocate(p, block_size);
+  } else if (block_size <= 1024) {
+    pools_5bit_.deallocate(p, block_size);
+  } else {
+    unpooled_memory_->Deallocate(p, bytes, alignment);
   }
-  // Deallocate a regular block, first check if last_dealloc_pool_ is suitable.
-  if (last_dealloc_pool_ && last_dealloc_pool_->GetBlockSize() == block_size) return last_dealloc_pool_->Deallocate(p);
-  // Find the pool with equal block_size.
-  impl::Pool pool(block_size, max_blocks_per_chunk_, GetUpstreamResource());
-  auto it = std::lower_bound(pools_.begin(), pools_.end(), pool,
-                             [](const auto &a, const auto &b) { return a.GetBlockSize() < b.GetBlockSize(); });
-  MG_ASSERT(it != pools_.end(), "Failed deallocation");
-  MG_ASSERT(it->GetBlockSize() == block_size, "Failed deallocation");
-  last_alloc_pool_ = &*it;
-  last_dealloc_pool_ = &*it;
-  return it->Deallocate(p);
 }
-
-void PoolResource::Release() {
-  for (auto &pool : pools_) pool.Release();
-  pools_.clear();
-  for (auto &big_block : unpooled_)
-    GetUpstreamResourceBlocks()->Deallocate(big_block.data, big_block.bytes, big_block.alignment);
-  unpooled_.clear();
-  last_alloc_pool_ = nullptr;
-  last_dealloc_pool_ = nullptr;
-}
-
-// PoolResource END
-
+bool PoolResource::DoIsEqual(MemoryResource const &other) const noexcept { return this == &other; }
 }  // namespace memgraph::utils
diff --git a/src/utils/memory.hpp b/src/utils/memory.hpp
index 225a3b6a1..8ff6c3523 100644
--- a/src/utils/memory.hpp
+++ b/src/utils/memory.hpp
@@ -1,4 +1,4 @@
-// Copyright 2023 Memgraph Ltd.
+// Copyright 2024 Memgraph Ltd.
 //
 // Use of this software is governed by the Business Source License
 // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@@ -15,7 +15,11 @@
 
 #pragma once
 
+#include <climits>
 #include <cstddef>
+#include <cstdint>
+#include <forward_list>
+#include <list>
 #include <memory>
 #include <mutex>
 #include <new>
@@ -248,6 +252,8 @@ bool operator!=(const Allocator<T> &a, const Allocator<U> &b) {
   return !(a == b);
 }
 
+auto NullMemoryResource() noexcept -> MemoryResource *;
+
 /// Wraps std::pmr::memory_resource for use with out MemoryResource
 class StdMemoryResource final : public MemoryResource {
  public:
@@ -380,37 +386,45 @@ class MonotonicBufferResource final : public MemoryResource {
 
 namespace impl {
 
+template <class T>
+using AList = std::forward_list<T, Allocator<T>>;
+
 template <class T>
 using AVector = std::vector<T, Allocator<T>>;
 
 /// Holds a number of Chunks each serving blocks of particular size. When a
-/// Chunk runs out of available blocks, a new Chunk is allocated. The naming is
-/// taken from `libstdc++` implementation, but the implementation details are
-/// more similar to `FixedAllocator` described in "Small Object Allocation" from
-/// "Modern C++ Design".
+/// Chunk runs out of available blocks, a new Chunk is allocated.
 class Pool final {
   /// Holds a pointer into a chunk of memory which consists of equal sized
-  /// blocks. Each Chunk can handle `std::numeric_limits<unsigned char>::max()`
-  /// number of blocks. Blocks form a "free-list", where each unused block has
-  /// an embedded index to the next unused block.
+  /// blocks. Blocks form a "free-list"
   struct Chunk {
-    unsigned char *data;
-    unsigned char first_available_block_ix;
-    unsigned char blocks_available;
+    // TODO: make blocks_per_chunk a per chunk thing (ie. allow chunk growth)
+    std::byte *raw_data;
+    explicit Chunk(std::byte *rawData) : raw_data(rawData) {}
+    std::byte *build_freelist(std::size_t block_size, std::size_t blocks_in_chunk) {
+      auto current = raw_data;
+      std::byte *prev = nullptr;
+      auto end = current + (blocks_in_chunk * block_size);
+      while (current != end) {
+        std::byte **list_entry = reinterpret_cast<std::byte **>(current);
+        *list_entry = std::exchange(prev, current);
+        current += block_size;
+      }
+      DMG_ASSERT(prev != nullptr);
+      return prev;
+    }
   };
 
-  unsigned char blocks_per_chunk_;
-  size_t block_size_;
-  AVector<Chunk> chunks_;
-  Chunk *last_alloc_chunk_{nullptr};
-  Chunk *last_dealloc_chunk_{nullptr};
+  std::byte *free_list_{nullptr};
+  uint8_t blocks_per_chunk_{};
+  std::size_t block_size_{};
+
+  AList<Chunk> chunks_;  // TODO: do ourself so we can do fast Release (detect monotonic, do nothing)
 
  public:
-  static constexpr auto MaxBlocksInChunk() {
-    return std::numeric_limits<decltype(Chunk::first_available_block_ix)>::max();
-  }
+  static constexpr auto MaxBlocksInChunk = std::numeric_limits<decltype(blocks_per_chunk_)>::max();
 
-  Pool(size_t block_size, unsigned char blocks_per_chunk, MemoryResource *memory);
+  Pool(size_t block_size, unsigned char blocks_per_chunk, MemoryResource *chunk_memory);
 
   Pool(const Pool &) = delete;
   Pool &operator=(const Pool &) = delete;
@@ -430,8 +444,147 @@ class Pool final {
   void *Allocate();
 
   void Deallocate(void *p);
+};
 
-  void Release();
+// C++ overloads for clz
+constexpr auto clz(unsigned int x) { return __builtin_clz(x); }
+constexpr auto clz(unsigned long x) { return __builtin_clzl(x); }
+constexpr auto clz(unsigned long long x) { return __builtin_clzll(x); }
+
+template <typename T>
+constexpr auto bits_sizeof = sizeof(T) * CHAR_BIT;
+
+/// 0-based bit index of the most significant bit assumed that `n` != 0
+template <typename T>
+constexpr auto msb_index(T n) {
+  return bits_sizeof<T> - clz(n) - T(1);
+}
+
+/* This function will in O(1) time provide a bin index based on:
+ * B  - the number of most significant bits to be sensitive to
+ * LB - the value that should be considered below the consideration for bin index of 0 (LB is exclusive)
+ *
+ * lets say we were:
+ * - sensitive to two bits (B == 2)
+ * - lowest bin is for 8 (LB == 8)
+ *
+ * our bin indexes would look like:
+ *   0 - 0000'1100 12
+ *   1 - 0001'0000 16
+ *   2 - 0001'1000 24
+ *   3 - 0010'0000 32
+ *   4 - 0011'0000 48
+ *   5 - 0100'0000 64
+ *   6 - 0110'0000 96
+ *   7 - 1000'0000 128
+ *   8 - 1100'0000 192
+ *   ...
+ *
+ * Example:
+ * Given n == 70, we want to return the bin index to the first value which is
+ * larger than n.
+ * bin_index<2,8>(70) => 6, as 64 (index 5) < 70 and 70 <= 96 (index 6)
+ */
+template <std::size_t B = 2, std::size_t LB = 8>
+constexpr std::size_t bin_index(std::size_t n) {
+  static_assert(B >= 1U, "Needs to be sensitive to at least one bit");
+  static_assert(LB != 0U, "Lower bound need to be non-zero");
+  DMG_ASSERT(n > LB);
+
+  // We will alway be sensitive to at least the MSB
+  // exponent tells us how many bits we need to use to select within a level
+  constexpr auto kExponent = B - 1U;
+  // 2^exponent gives the size of each level
+  constexpr auto kSize = 1U << kExponent;
+  // offset help adjust results down to be inline with bin_index(LB) == 0
+  constexpr auto kOffset = msb_index(LB);
+
+  auto const msb_idx = msb_index(n);
+  DMG_ASSERT(msb_idx != 0);
+
+  auto const mask = (1u << msb_idx) - 1u;
+  auto const under = n & mask;
+  auto const selector = under >> (msb_idx - kExponent);
+
+  auto const rest = under & (mask >> kExponent);
+  auto const no_overflow = rest == 0U;
+
+  auto const msb_level = kSize * (msb_idx - kOffset);
+  return msb_level + selector - no_overflow;
+}
+
+// This is the inverse opperation for bin_index
+// bin_size(bin_index(X)-1) < X <= bin_size(bin_index(X))
+template <std::size_t B = 2, std::size_t LB = 8>
+std::size_t bin_size(std::size_t idx) {
+  constexpr auto kExponent = B - 1U;
+  constexpr auto kSize = 1U << kExponent;
+  constexpr auto kOffset = msb_index(LB);
+
+  // no need to optimise `/` or `%` compiler can see `kSize` is a power of 2
+  auto const level = (idx + 1) / kSize;
+  auto const sub_level = (idx + 1) % kSize;
+  return (1U << (level + kOffset)) | (sub_level << (level + kOffset - kExponent));
+}
+
+template <std::size_t Bits, std::size_t LB, std::size_t UB>
+struct MultiPool {
+  static_assert(LB < UB, "lower bound must be less than upper bound");
+  static_assert(IsPow2(LB) && IsPow2(UB), "Design untested for non powers of 2");
+  static_assert((LB << Bits) % sizeof(void *) == 0, "Smallest pool must have space and alignment for freelist");
+
+  // upper bound is inclusive
+  static bool is_size_handled(std::size_t size) { return LB < size && size <= UB; }
+  static bool is_above_upper_bound(std::size_t size) { return UB < size; }
+
+  static constexpr auto n_bins = bin_index<Bits, LB>(UB) + 1U;
+
+  MultiPool(uint8_t blocks_per_chunk, MemoryResource *memory, MemoryResource *internal_memory)
+      : blocks_per_chunk_{blocks_per_chunk}, memory_{memory}, internal_memory_{internal_memory} {}
+
+  ~MultiPool() {
+    if (pools_) {
+      auto pool_alloc = Allocator<Pool>(internal_memory_);
+      for (auto i = 0U; i != n_bins; ++i) {
+        pool_alloc.destroy(&pools_[i]);
+      }
+      pool_alloc.deallocate(pools_, n_bins);
+    }
+  }
+
+  void *allocate(std::size_t bytes) {
+    auto idx = bin_index<Bits, LB>(bytes);
+    if (!pools_) [[unlikely]] {
+      initialise_pools();
+    }
+    return pools_[idx].Allocate();
+  }
+
+  void deallocate(void *ptr, std::size_t bytes) {
+    auto idx = bin_index<Bits, LB>(bytes);
+    pools_[idx].Deallocate(ptr);
+  }
+
+ private:
+  void initialise_pools() {
+    auto pool_alloc = Allocator<Pool>(internal_memory_);
+    auto pools = pool_alloc.allocate(n_bins);
+    try {
+      for (auto i = 0U; i != n_bins; ++i) {
+        auto block_size = bin_size<Bits, LB>(i);
+        pool_alloc.construct(&pools[i], block_size, blocks_per_chunk_, memory_);
+      }
+      pools_ = pools;
+    } catch (...) {
+      pool_alloc.deallocate(pools, n_bins);
+      throw;
+    }
+  }
+
+  Pool *pools_{};
+  uint8_t blocks_per_chunk_{};
+  MemoryResource *memory_{};
+  MemoryResource *internal_memory_{};
 };
 
 }  // namespace impl
@@ -442,8 +595,6 @@ class Pool final {
 ///
 /// This class has the following properties with regards to memory management.
 ///
-///   * All allocated memory will be freed upon destruction, even if Deallocate
-///     has not been called for some of the allocated blocks.
 ///   * It consists of a collection of impl::Pool instances, each serving
 ///     requests for different block sizes. Each impl::Pool manages a collection
 ///     of impl::Pool::Chunk instances which are divided into blocks of uniform
@@ -452,91 +603,46 @@ class Pool final {
 ///     arbitrary alignment requests. Each requested block size must be a
 ///     multiple of alignment or smaller than the alignment value.
 ///   * An allocation request within the limits of the maximum block size will
-///     find a Pool serving the requested size. If there's no Pool serving such
-///     a request, a new one is instantiated.
+///     find a Pool serving the requested size. Some requests will share a larger
+///     pool size.
 ///   * When a Pool exhausts its Chunk, a new one is allocated with the size for
 ///     the maximum number of blocks.
 ///   * Allocation requests which exceed the maximum block size will be
 ///     forwarded to upstream MemoryResource.
-///   * Maximum block size and maximum number of blocks per chunk can be tuned
-///     by passing the arguments to the constructor.
+///   * Maximum number of blocks per chunk can be tuned by passing the
+///     arguments to the constructor.
+
 class PoolResource final : public MemoryResource {
  public:
-  /// Construct with given max_blocks_per_chunk, max_block_size and upstream
-  /// memory.
-  ///
-  /// The implementation will use std::min(max_blocks_per_chunk,
-  /// impl::Pool::MaxBlocksInChunk()) as the real maximum number of blocks per
-  /// chunk. Allocation requests exceeding max_block_size are simply forwarded
-  /// to upstream memory.
-  PoolResource(size_t max_blocks_per_chunk, size_t max_block_size, MemoryResource *memory_pools = NewDeleteResource(),
-               MemoryResource *memory_unpooled = NewDeleteResource());
-
-  PoolResource(const PoolResource &) = delete;
-  PoolResource &operator=(const PoolResource &) = delete;
-
-  PoolResource(PoolResource &&) = default;
-  PoolResource &operator=(PoolResource &&) = default;
-
-  ~PoolResource() override { Release(); }
-
-  MemoryResource *GetUpstreamResource() const { return pools_.get_allocator().GetMemoryResource(); }
-  MemoryResource *GetUpstreamResourceBlocks() const { return unpooled_.get_allocator().GetMemoryResource(); }
-
-  /// Release all allocated memory.
-  void Release();
+  PoolResource(uint8_t blocks_per_chunk, MemoryResource *memory = NewDeleteResource(),
+                MemoryResource *internal_memory = NewDeleteResource())
+      : mini_pools_{
+            impl::Pool{8, blocks_per_chunk, memory},
+            impl::Pool{16, blocks_per_chunk, memory},
+            impl::Pool{24, blocks_per_chunk, memory},
+            impl::Pool{32, blocks_per_chunk, memory},
+            impl::Pool{40, blocks_per_chunk, memory},
+            impl::Pool{48, blocks_per_chunk, memory},
+            impl::Pool{56, blocks_per_chunk, memory},
+            impl::Pool{64, blocks_per_chunk, memory},
+        },
+        pools_3bit_(blocks_per_chunk, memory, internal_memory),
+        pools_4bit_(blocks_per_chunk, memory, internal_memory),
+        pools_5bit_(blocks_per_chunk, memory, internal_memory),
+        unpooled_memory_{internal_memory} {}
+  ~PoolResource() override = default;
 
  private:
-  // Big block larger than max_block_size_, doesn't go into a pool.
-  struct BigBlock {
-    size_t bytes;
-    size_t alignment;
-    void *data;
-  };
-
-  // TODO: Potential memory optimization is replacing `std::vector` with our
-  // custom vector implementation which doesn't store a `MemoryResource *`.
-  // Currently we have vectors for `pools_` and `unpooled_`, as well as each
-  // `impl::Pool` stores a `chunks_` vector.
-
-  // Pools are sorted by bound_size_, ascending.
-  impl::AVector<impl::Pool> pools_;
-  impl::Pool *last_alloc_pool_{nullptr};
-  impl::Pool *last_dealloc_pool_{nullptr};
-  // Unpooled BigBlocks are sorted by data pointer.
-  impl::AVector<BigBlock> unpooled_;
-  size_t max_blocks_per_chunk_;
-  size_t max_block_size_;
-
   void *DoAllocate(size_t bytes, size_t alignment) override;
-
   void DoDeallocate(void *p, size_t bytes, size_t alignment) override;
-
-  bool DoIsEqual(const MemoryResource &other) const noexcept override { return this == &other; }
-};
-
-/// Like PoolResource but uses SpinLock for thread safe usage.
-class SynchronizedPoolResource final : public MemoryResource {
- public:
-  SynchronizedPoolResource(size_t max_blocks_per_chunk, size_t max_block_size,
-                           MemoryResource *memory = NewDeleteResource())
-      : pool_memory_(max_blocks_per_chunk, max_block_size, memory) {}
+  bool DoIsEqual(MemoryResource const &other) const noexcept override;
 
  private:
-  PoolResource pool_memory_;
-  SpinLock lock_;
-
-  void *DoAllocate(size_t bytes, size_t alignment) override {
-    std::lock_guard<SpinLock> guard(lock_);
-    return pool_memory_.Allocate(bytes, alignment);
-  }
-
-  void DoDeallocate(void *p, size_t bytes, size_t alignment) override {
-    std::lock_guard<SpinLock> guard(lock_);
-    pool_memory_.Deallocate(p, bytes, alignment);
-  }
-
-  bool DoIsEqual(const MemoryResource &other) const noexcept override { return this == &other; }
+  std::array<impl::Pool, 8> mini_pools_;
+  impl::MultiPool<3, 64, 128> pools_3bit_;
+  impl::MultiPool<4, 128, 512> pools_4bit_;
+  impl::MultiPool<5, 512, 1024> pools_5bit_;
+  MemoryResource *unpooled_memory_;
 };
 
 class MemoryTrackingResource final : public utils::MemoryResource {
diff --git a/src/utils/tag.hpp b/src/utils/tag.hpp
new file mode 100644
index 000000000..dfd8c8f81
--- /dev/null
+++ b/src/utils/tag.hpp
@@ -0,0 +1,32 @@
+// Copyright 2024 Memgraph Ltd.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
+// License, and you may not use this file except in compliance with the Business Source License.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+#pragma once
+
+namespace memgraph::utils {
+
+template <typename T>
+struct tag_type {
+  using type = T;
+};
+
+template <auto V>
+struct tag_value {
+  static constexpr auto value = V;
+};
+
+template <typename T>
+auto tag_t = tag_type<T>{};
+
+template <auto V>
+auto tag_v = tag_value<V>{};
+
+}  // namespace memgraph::utils
diff --git a/tests/benchmark/query/execution.cpp b/tests/benchmark/query/execution.cpp
index d49b14fc3..1d65cdb93 100644
--- a/tests/benchmark/query/execution.cpp
+++ b/tests/benchmark/query/execution.cpp
@@ -55,12 +55,12 @@ class NewDeleteResource final {
 };
 
 class PoolResource final {
-  memgraph::utils::PoolResource memory_{128, 4 * 1024};
+  memgraph::utils::PoolResource memory_{128};
 
  public:
   memgraph::utils::MemoryResource *get() { return &memory_; }
 
-  void Reset() { memory_.Release(); }
+  void Reset() {}
 };
 
 static void AddVertices(memgraph::storage::Storage *db, int vertex_count) {
diff --git a/tests/benchmark/skip_list_vs_stl.cpp b/tests/benchmark/skip_list_vs_stl.cpp
index 1a17e56e1..9a856822f 100644
--- a/tests/benchmark/skip_list_vs_stl.cpp
+++ b/tests/benchmark/skip_list_vs_stl.cpp
@@ -1,4 +1,4 @@
-// Copyright 2022 Memgraph Ltd.
+// Copyright 2024 Memgraph Ltd.
 //
 // Use of this software is governed by the Business Source License
 // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@@ -101,8 +101,7 @@ class StdSetWithPoolAllocatorInsertFixture : public benchmark::Fixture {
   }
 
  protected:
-  memgraph::utils::PoolResource memory_{256U /* max_blocks_per_chunk */, 1024U /* max_block_size */,
-                                        memgraph::utils::NewDeleteResource()};
+  memgraph::utils::PoolResource memory_{128U /* max_blocks_per_chunk */, memgraph::utils::NewDeleteResource()};
   std::set<uint64_t, std::less<>, memgraph::utils::Allocator<uint64_t>> container{&memory_};
   memgraph::utils::SpinLock lock;
 };
@@ -208,8 +207,7 @@ class StdSetWithPoolAllocatorFindFixture : public benchmark::Fixture {
   }
 
  protected:
-  memgraph::utils::PoolResource memory_{256U /* max_blocks_per_chunk */, 1024U /* max_block_size */,
-                                        memgraph::utils::NewDeleteResource()};
+  memgraph::utils::PoolResource memory_{128U /* max_blocks_per_chunk */, memgraph::utils::NewDeleteResource()};
   std::set<uint64_t, std::less<>, memgraph::utils::Allocator<uint64_t>> container{&memory_};
   memgraph::utils::SpinLock lock;
 };
@@ -325,8 +323,7 @@ class StdMapWithPoolAllocatorInsertFixture : public benchmark::Fixture {
   }
 
  protected:
-  memgraph::utils::PoolResource memory_{256U /* max_blocks_per_chunk */, 1024U /* max_block_size */,
-                                        memgraph::utils::NewDeleteResource()};
+  memgraph::utils::PoolResource memory_{128U /* max_blocks_per_chunk */, memgraph::utils::NewDeleteResource()};
   std::map<uint64_t, uint64_t, std::less<>, memgraph::utils::Allocator<std::pair<const uint64_t, uint64_t>>> container{
       &memory_};
   memgraph::utils::SpinLock lock;
@@ -433,8 +430,7 @@ class StdMapWithPoolAllocatorFindFixture : public benchmark::Fixture {
   }
 
  protected:
-  memgraph::utils::PoolResource memory_{256U /* max_blocks_per_chunk */, 1024U /* max_block_size */,
-                                        memgraph::utils::NewDeleteResource()};
+  memgraph::utils::PoolResource memory_{128U /* max_blocks_per_chunk */, memgraph::utils::NewDeleteResource()};
   std::map<uint64_t, uint64_t, std::less<>, memgraph::utils::Allocator<std::pair<const uint64_t, uint64_t>>> container{
       &memory_};
   memgraph::utils::SpinLock lock;
diff --git a/tests/e2e/memory/workloads.yaml b/tests/e2e/memory/workloads.yaml
index bf29e484c..c043e03d8 100644
--- a/tests/e2e/memory/workloads.yaml
+++ b/tests/e2e/memory/workloads.yaml
@@ -52,26 +52,26 @@ in_memory_query_limit_cluster: &in_memory_query_limit_cluster
       setup_queries: []
       validation_queries: []
 
-args_450_MiB_limit: &args_450_MiB_limit
+args_350_MiB_limit: &args_350_MiB_limit
   - "--bolt-port"
   - *bolt_port
-  - "--memory-limit=450"
+  - "--memory-limit=350"
   - "--storage-gc-cycle-sec=180"
   - "--log-level=INFO"
 
-in_memory_450_MiB_limit_cluster: &in_memory_450_MiB_limit_cluster
+in_memory_350_MiB_limit_cluster: &in_memory_350_MiB_limit_cluster
   cluster:
     main:
-      args: *args_450_MiB_limit
+      args: *args_350_MiB_limit
       log_file: "memory-e2e.log"
       setup_queries: []
       validation_queries: []
 
 
-disk_450_MiB_limit_cluster: &disk_450_MiB_limit_cluster
+disk_350_MiB_limit_cluster: &disk_350_MiB_limit_cluster
   cluster:
     main:
-      args: *args_450_MiB_limit
+      args: *args_350_MiB_limit
       log_file: "memory-e2e.log"
       setup_queries: []
       validation_queries: []
@@ -192,22 +192,22 @@ workloads:
   - name: "Memory control for accumulation"
     binary: "tests/e2e/memory/memgraph__e2e__memory__limit_accumulation"
     args: ["--bolt-port", *bolt_port]
-    <<: *in_memory_450_MiB_limit_cluster
+    <<: *in_memory_350_MiB_limit_cluster
 
   - name: "Memory control for accumulation on disk storage"
     binary: "tests/e2e/memory/memgraph__e2e__memory__limit_accumulation"
     args: ["--bolt-port", *bolt_port]
-    <<: *disk_450_MiB_limit_cluster
+    <<: *disk_350_MiB_limit_cluster
 
   - name: "Memory control for edge create"
     binary: "tests/e2e/memory/memgraph__e2e__memory__limit_edge_create"
     args: ["--bolt-port", *bolt_port]
-    <<: *in_memory_450_MiB_limit_cluster
+    <<: *in_memory_350_MiB_limit_cluster
 
   - name: "Memory control for edge create on disk storage"
     binary: "tests/e2e/memory/memgraph__e2e__memory__limit_edge_create"
     args: ["--bolt-port", *bolt_port]
-    <<: *disk_450_MiB_limit_cluster
+    <<: *disk_350_MiB_limit_cluster
 
   - name: "Memory control for memory limit global thread alloc"
     binary: "tests/e2e/memory/memgraph__e2e__memory_limit_global_thread_alloc_proc"
diff --git a/tests/mgbench/runners.py b/tests/mgbench/runners.py
index e1f52b696..155ceac06 100644
--- a/tests/mgbench/runners.py
+++ b/tests/mgbench/runners.py
@@ -416,6 +416,7 @@ class Memgraph(BaseRunner):
     def __init__(self, benchmark_context: BenchmarkContext):
         super().__init__(benchmark_context=benchmark_context)
         self._memgraph_binary = benchmark_context.vendor_binary
+        self._bolt_num_workers = benchmark_context.num_workers_for_benchmark
         self._performance_tracking = benchmark_context.performance_tracking
         self._directory = tempfile.TemporaryDirectory(dir=benchmark_context.temporary_directory)
         self._vendor_args = benchmark_context.vendor_args
@@ -440,6 +441,7 @@ class Memgraph(BaseRunner):
         kwargs["bolt_port"] = self._bolt_port
         kwargs["data_directory"] = data_directory
         kwargs["storage_properties_on_edges"] = True
+        kwargs["bolt_num_workers"] = self._bolt_num_workers
         for key, value in self._vendor_args.items():
             kwargs[key] = value
         return _convert_args_to_flags(self._memgraph_binary, **kwargs)
diff --git a/tests/unit/property_value_v2.cpp b/tests/unit/property_value_v2.cpp
index aba322ce7..28937598e 100644
--- a/tests/unit/property_value_v2.cpp
+++ b/tests/unit/property_value_v2.cpp
@@ -1,4 +1,4 @@
-// Copyright 2022 Memgraph Ltd.
+// Copyright 2024 Memgraph Ltd.
 //
 // Use of this software is governed by the Business Source License
 // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@@ -570,7 +570,6 @@ TEST(PropertyValue, MoveConstructor) {
   for (auto &item : data) {
     memgraph::storage::PropertyValue copy(item);
     memgraph::storage::PropertyValue pv(std::move(item));
-    ASSERT_EQ(item.type(), memgraph::storage::PropertyValue::Type::Null);
     ASSERT_EQ(pv.type(), copy.type());
     switch (copy.type()) {
       case memgraph::storage::PropertyValue::Type::Null:
@@ -668,7 +667,6 @@ TEST(PropertyValue, MoveAssignment) {
     memgraph::storage::PropertyValue copy(item);
     memgraph::storage::PropertyValue pv(123);
     pv = std::move(item);
-    ASSERT_EQ(item.type(), memgraph::storage::PropertyValue::Type::Null);
     ASSERT_EQ(pv.type(), copy.type());
     switch (copy.type()) {
       case memgraph::storage::PropertyValue::Type::Null:
diff --git a/tests/unit/utils_memory.cpp b/tests/unit/utils_memory.cpp
index 5173a5f7b..e46c6c1f9 100644
--- a/tests/unit/utils_memory.cpp
+++ b/tests/unit/utils_memory.cpp
@@ -1,4 +1,4 @@
-// Copyright 2023 Memgraph Ltd.
+// Copyright 2024 Memgraph Ltd.
 //
 // Use of this software is governed by the Business Source License
 // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@@ -195,134 +195,6 @@ TEST(MonotonicBufferResource, AllocationWithInitialBufferOnStack) {
   }
 }
 
-// NOLINTNEXTLINE(hicpp-special-member-functions)
-TEST(PoolResource, SingleSmallBlockAllocations) {
-  TestMemory test_mem;
-  const size_t max_blocks_per_chunk = 3U;
-  const size_t max_block_size = 64U;
-  memgraph::utils::PoolResource mem(max_blocks_per_chunk, max_block_size, &test_mem);
-  // Fill the first chunk.
-  CheckAllocation(&mem, 64U, 1U);
-  // May allocate more than once due to bookkeeping.
-  EXPECT_GE(test_mem.new_count_, 1U);
-  // Reset tracking and continue filling the first chunk.
-  test_mem.new_count_ = 0U;
-  CheckAllocation(&mem, 64U, 64U);
-  CheckAllocation(&mem, 64U);
-  EXPECT_EQ(test_mem.new_count_, 0U);
-  // Reset tracking and fill the second chunk
-  test_mem.new_count_ = 0U;
-  CheckAllocation(&mem, 64U, 32U);
-  auto *ptr1 = CheckAllocation(&mem, 32U, 64U);  // this will become 64b block
-  auto *ptr2 = CheckAllocation(&mem, 64U, 32U);
-  // We expect one allocation for chunk and at most one for bookkeeping.
-  EXPECT_TRUE(test_mem.new_count_ >= 1U && test_mem.new_count_ <= 2U);
-  test_mem.delete_count_ = 0U;
-  mem.Deallocate(ptr1, 32U, 64U);
-  mem.Deallocate(ptr2, 64U, 32U);
-  EXPECT_EQ(test_mem.delete_count_, 0U);
-  mem.Release();
-  EXPECT_GE(test_mem.delete_count_, 2U);
-  CheckAllocation(&mem, 64U, 1U);
-}
-
-// NOLINTNEXTLINE(hicpp-special-member-functions)
-TEST(PoolResource, MultipleSmallBlockAllocations) {
-  TestMemory test_mem;
-  const size_t max_blocks_per_chunk = 1U;
-  const size_t max_block_size = 64U;
-  memgraph::utils::PoolResource mem(max_blocks_per_chunk, max_block_size, &test_mem);
-  CheckAllocation(&mem, 64U);
-  CheckAllocation(&mem, 18U, 2U);
-  CheckAllocation(&mem, 24U, 8U);
-  // May allocate more than once per chunk due to bookkeeping.
-  EXPECT_GE(test_mem.new_count_, 3U);
-  // Reset tracking and fill the second chunk
-  test_mem.new_count_ = 0U;
-  CheckAllocation(&mem, 64U);
-  CheckAllocation(&mem, 18U, 2U);
-  CheckAllocation(&mem, 24U, 8U);
-  // We expect one allocation for chunk and at most one for bookkeeping.
-  EXPECT_TRUE(test_mem.new_count_ >= 3U && test_mem.new_count_ <= 6U);
-  mem.Release();
-  EXPECT_GE(test_mem.delete_count_, 6U);
-  CheckAllocation(&mem, 64U);
-}
-
-// NOLINTNEXTLINE(hicpp-special-member-functions)
-TEST(PoolResource, BigBlockAllocations) {
-  TestMemory test_mem;
-  TestMemory test_mem_unpooled;
-  const size_t max_blocks_per_chunk = 3U;
-  const size_t max_block_size = 64U;
-  memgraph::utils::PoolResource mem(max_blocks_per_chunk, max_block_size, &test_mem, &test_mem_unpooled);
-  CheckAllocation(&mem, max_block_size + 1, 1U);
-  // May allocate more than once per block due to bookkeeping.
-  EXPECT_GE(test_mem_unpooled.new_count_, 1U);
-  CheckAllocation(&mem, max_block_size + 1, 1U);
-  EXPECT_GE(test_mem_unpooled.new_count_, 2U);
-  auto *ptr = CheckAllocation(&mem, max_block_size * 2, 1U);
-  EXPECT_GE(test_mem_unpooled.new_count_, 3U);
-  mem.Deallocate(ptr, max_block_size * 2, 1U);
-  EXPECT_GE(test_mem_unpooled.delete_count_, 1U);
-  mem.Release();
-  EXPECT_GE(test_mem_unpooled.delete_count_, 3U);
-  CheckAllocation(&mem, max_block_size + 1, 1U);
-}
-
-// NOLINTNEXTLINE(hicpp-special-member-functions)
-TEST(PoolResource, BlockSizeIsNotMultipleOfAlignment) {
-  const size_t max_blocks_per_chunk = 3U;
-  const size_t max_block_size = 64U;
-  memgraph::utils::PoolResource mem(max_blocks_per_chunk, max_block_size);
-  EXPECT_THROW(mem.Allocate(64U, 24U), std::bad_alloc);
-  EXPECT_THROW(mem.Allocate(63U), std::bad_alloc);
-  EXPECT_THROW(mem.Allocate(max_block_size + 1, max_block_size), std::bad_alloc);
-}
-
-// NOLINTNEXTLINE(hicpp-special-member-functions)
-TEST(PoolResource, AllocationWithOverflow) {
-  {
-    const size_t max_blocks_per_chunk = 2U;
-    memgraph::utils::PoolResource mem(max_blocks_per_chunk, std::numeric_limits<size_t>::max());
-    EXPECT_THROW(mem.Allocate(std::numeric_limits<size_t>::max(), 1U), std::bad_alloc);
-    // Throws because initial chunk block is aligned to
-    // memgraph::utils::Ceil2(block_size), which wraps in this case.
-    EXPECT_THROW(mem.Allocate((std::numeric_limits<size_t>::max() - 1U) / max_blocks_per_chunk, 1U), std::bad_alloc);
-  }
-  {
-    const size_t max_blocks_per_chunk = memgraph::utils::impl::Pool::MaxBlocksInChunk();
-    memgraph::utils::PoolResource mem(max_blocks_per_chunk, std::numeric_limits<size_t>::max());
-    EXPECT_THROW(mem.Allocate(std::numeric_limits<size_t>::max(), 1U), std::bad_alloc);
-    // Throws because initial chunk block is aligned to
-    // memgraph::utils::Ceil2(block_size), which wraps in this case.
-    EXPECT_THROW(mem.Allocate((std::numeric_limits<size_t>::max() - 1U) / max_blocks_per_chunk, 1U), std::bad_alloc);
-  }
-}
-
-TEST(PoolResource, BlockDeallocation) {
-  TestMemory test_mem;
-  const size_t max_blocks_per_chunk = 2U;
-  const size_t max_block_size = 64U;
-  memgraph::utils::PoolResource mem(max_blocks_per_chunk, max_block_size, &test_mem);
-  auto *ptr = CheckAllocation(&mem, max_block_size);
-  test_mem.new_count_ = 0U;
-  // Do another allocation before deallocating `ptr`, so that we are sure that
-  // the chunk of 2 blocks is still alive and therefore `ptr` may be reused when
-  // it's deallocated. If we deallocate now, the implementation may choose to
-  // free the whole chunk, and we do not want that for the purposes of this
-  // test.
-  CheckAllocation(&mem, max_block_size);
-  EXPECT_EQ(test_mem.new_count_, 0U);
-  EXPECT_EQ(test_mem.delete_count_, 0U);
-  mem.Deallocate(ptr, max_block_size);
-  EXPECT_EQ(test_mem.delete_count_, 0U);
-  // CheckAllocation(&mem, max_block_size) will fail as PoolResource should
-  // reuse free blocks.
-  EXPECT_EQ(ptr, mem.Allocate(max_block_size));
-  EXPECT_EQ(test_mem.new_count_, 0U);
-}
-
 class AllocationTrackingMemory final : public memgraph::utils::MemoryResource {
  public:
   std::vector<size_t> allocated_sizes_;

From 0ed2d18754157ff27f915ab8977c5d78df37405b Mon Sep 17 00:00:00 2001
From: Aidar Samerkhanov <aidar.samerkhanov@memgraph.io>
Date: Fri, 15 Mar 2024 11:39:37 +0400
Subject: [PATCH 2/2] Add RollUpApply operator support to edge type index
 rewrite. (#1816)

---
 src/query/plan/rewrite/edge_type_index_lookup.hpp | 12 ++++++++++++
 1 file changed, 12 insertions(+)

diff --git a/src/query/plan/rewrite/edge_type_index_lookup.hpp b/src/query/plan/rewrite/edge_type_index_lookup.hpp
index ed8666513..893fef970 100644
--- a/src/query/plan/rewrite/edge_type_index_lookup.hpp
+++ b/src/query/plan/rewrite/edge_type_index_lookup.hpp
@@ -465,6 +465,18 @@ class EdgeTypeIndexRewriter final : public HierarchicalLogicalOperatorVisitor {
     return true;
   }
 
+  bool PreVisit(RollUpApply &op) override {
+    prev_ops_.push_back(&op);
+    op.input()->Accept(*this);
+    RewriteBranch(&op.list_collection_branch_);
+    return false;
+  }
+
+  bool PostVisit(RollUpApply &) override {
+    prev_ops_.pop_back();
+    return true;
+  }
+
   std::shared_ptr<LogicalOperator> new_root_;
 
  private: