From b5e255b8967fe6d60f9b9e04cb40c32c20adfaa8 Mon Sep 17 00:00:00 2001
From: Matej Ferencevic <matej.ferencevic@memgraph.io>
Date: Mon, 23 Dec 2019 13:58:36 +0100
Subject: [PATCH] Implement local buffer for PropertyStore

Reviewers: teon.banek, ipaljak

Reviewed By: teon.banek

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D2605
---
 src/storage/v2/property_store.cpp        | 225 ++++++++++++++++++-----
 src/storage/v2/property_store.hpp        |   3 +-
 tests/unit/storage_v2_property_store.cpp |  91 +++++++++
 3 files changed, 273 insertions(+), 46 deletions(-)

diff --git a/src/storage/v2/property_store.cpp b/src/storage/v2/property_store.cpp
index 6a46bbf15..90fcb0cef 100644
--- a/src/storage/v2/property_store.cpp
+++ b/src/storage/v2/property_store.cpp
@@ -3,6 +3,7 @@
 #include <cstring>
 #include <limits>
 #include <optional>
+#include <tuple>
 #include <type_traits>
 #include <utility>
 
@@ -628,38 +629,102 @@ uint64_t ToPowerOf8(uint64_t size) {
   return size - mod + 8;
 }
 
+// The `PropertyStore` also uses a small buffer optimization in it. If the data
+// fits into the size of the internally stored pointer and size, then the
+// pointer and size are used as a in-place buffer. In order to be able to do
+// this we store a `union` of the two sets of data. Because the storage is a
+// `union`, only one set of information (pointer+size or buffer) can be used at
+// any time. The buffer perfectly overlaps with the memory locations of the
+// pointer+size. This is illustrated in the following diagram:
+//
+// Memory (hex):
+// 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
+// |---------------------|                         -> size
+//                         |---------------------| -> data
+// 0  1  2  3  4  5  6  7  8  9  10 11 12 13 14 15 -> buffer_ (positions)
+//
+// When we are using the pointer+size we know that the size must
+// be a multiple of 8 (because we always allocate a buffer whose size is a
+// multiple of 8). That means that the lower 3 bits of the `size` field must be
+// zero when the data is used as a pointer+size.
+//
+// Because this architecture is little-endian, we know that `buffer_[0]` will be
+// aligned with the lowest byte of the `size` field. When we use the inline
+// `buffer_` we write `kUseLocalBuffer` (which is exactly 1) to `buffer_[0]`
+// which will make the `size` read (independent of the other values in the
+// buffer) always not be a multiple of 8. We use that fact to distinguish which
+// of the two sets of data is currently active. Because the first byte of the
+// buffer is used to distinguish which of the two sets of data is used, we can
+// only use the leftover 15 bytes for raw data storage.
+#ifndef __x86_64__
+#error The PropertyStore only supports x86_64
+#endif
+
+const uint8_t kUseLocalBuffer = 0x01;
+
+// Helper functions used to retrieve/store `size` and `data` from/into the
+// `buffer_`.
+
+std::pair<uint64_t, uint8_t *> GetSizeData(const uint8_t *buffer) {
+  uint64_t size;
+  uint8_t *data;
+  memcpy(&size, buffer, sizeof(uint64_t));
+  memcpy(&data, buffer + sizeof(uint64_t), sizeof(uint8_t *));
+  return {size, data};
+}
+
+void SetSizeData(uint8_t *buffer, uint64_t size, uint8_t *data) {
+  memcpy(buffer, &size, sizeof(uint64_t));
+  memcpy(buffer + sizeof(uint64_t), &data, sizeof(uint8_t *));
+}
+
 }  // namespace
 
-PropertyStore::PropertyStore() {}
+PropertyStore::PropertyStore() { memset(buffer_, 0, sizeof(buffer_)); }
 
-PropertyStore::PropertyStore(PropertyStore &&other) noexcept
-    : data_(other.data_), size_(other.size_) {
-  other.data_ = nullptr;
-  other.size_ = 0;
+PropertyStore::PropertyStore(PropertyStore &&other) noexcept {
+  memcpy(buffer_, other.buffer_, sizeof(buffer_));
+  memset(other.buffer_, 0, sizeof(other.buffer_));
 }
 
 PropertyStore &PropertyStore::operator=(PropertyStore &&other) noexcept {
-  delete[] data_;
+  uint64_t size;
+  uint8_t *data;
+  std::tie(size, data) = GetSizeData(buffer_);
+  if (size % 8 == 0) {
+    // We are storing the data in an external buffer.
+    delete[] data;
+  }
 
-  data_ = other.data_;
-  size_ = other.size_;
-
-  other.data_ = nullptr;
-  other.size_ = 0;
+  memcpy(buffer_, other.buffer_, sizeof(buffer_));
+  memset(other.buffer_, 0, sizeof(other.buffer_));
 
   return *this;
 }
 
 PropertyStore::~PropertyStore() {
-  delete[] data_;
-  size_ = 0;
+  uint64_t size;
+  uint8_t *data;
+  std::tie(size, data) = GetSizeData(buffer_);
+  if (size % 8 == 0) {
+    // We are storing the data in an external buffer.
+    delete[] data;
+  }
 }
 
 PropertyValue PropertyStore::GetProperty(PropertyId property) const {
-  Reader reader(data_, size_);
+  uint64_t size;
+  const uint8_t *data;
+  std::tie(size, data) = GetSizeData(buffer_);
+  if (size % 8 != 0) {
+    // We are storing the data in the local buffer.
+    size = sizeof(buffer_) - 1;
+    data = &buffer_[1];
+  }
+  Reader reader(data, size);
   auto info = FindProperty(&reader, property);
   if (info.property_size == 0) return PropertyValue();
-  Reader prop_reader(data_ + info.property_begin, info.property_size);
+  Reader prop_reader(data + info.property_begin, info.property_size);
   auto prop = DecodeProperty(&prop_reader);
   CHECK(prop) << "Invalid database state!";
   CHECK(prop->first == property) << "Invalid database state!";
@@ -667,13 +732,29 @@ PropertyValue PropertyStore::GetProperty(PropertyId property) const {
 }
 
 bool PropertyStore::HasProperty(PropertyId property) const {
-  Reader reader(data_, size_);
+  uint64_t size;
+  const uint8_t *data;
+  std::tie(size, data) = GetSizeData(buffer_);
+  if (size % 8 != 0) {
+    // We are storing the data in the local buffer.
+    size = sizeof(buffer_) - 1;
+    data = &buffer_[1];
+  }
+  Reader reader(data, size);
   auto info = FindProperty(&reader, property);
   return info.property_size != 0;
 }
 
 std::map<PropertyId, PropertyValue> PropertyStore::Properties() const {
-  Reader reader(data_, size_);
+  uint64_t size;
+  const uint8_t *data;
+  std::tie(size, data) = GetSizeData(buffer_);
+  if (size % 8 != 0) {
+    // We are storing the data in the local buffer.
+    size = sizeof(buffer_) - 1;
+    data = &buffer_[1];
+  }
+  Reader reader(data, size);
   std::map<PropertyId, PropertyValue> props;
   while (true) {
     auto ret = DecodeProperty(&reader);
@@ -692,16 +773,42 @@ bool PropertyStore::SetProperty(PropertyId property,
     property_size = writer.Written();
   }
 
+  bool in_local_buffer = false;
+  uint64_t size;
+  uint8_t *data;
+  std::tie(size, data) = GetSizeData(buffer_);
+  if (size % 8 != 0) {
+    // We are storing the data in the local buffer.
+    size = sizeof(buffer_) - 1;
+    data = &buffer_[1];
+    in_local_buffer = true;
+  }
+
   bool existed = false;
-  if (!data_) {
+  if (!size) {
     if (!value.IsNull()) {
       // We don't have a data buffer. Allocate a new one.
-      auto size = ToPowerOf8(property_size);
-      data_ = new uint8_t[size];
-      size_ = size;
+      auto property_size_to_power_of_8 = ToPowerOf8(property_size);
+      if (property_size <= sizeof(buffer_) - 1) {
+        // Use the local buffer.
+        buffer_[0] = kUseLocalBuffer;
+        size = sizeof(buffer_) - 1;
+        data = &buffer_[1];
+        in_local_buffer = true;
+      } else {
+        // Allocate a new external buffer.
+        auto alloc_data = new uint8_t[property_size_to_power_of_8];
+        auto alloc_size = property_size_to_power_of_8;
+
+        SetSizeData(buffer_, alloc_size, alloc_data);
+
+        size = alloc_size;
+        data = alloc_data;
+        in_local_buffer = false;
+      }
 
       // Encode the property into the data buffer.
-      Writer writer(data_, size_);
+      Writer writer(data, size);
       CHECK(EncodeProperty(&writer, property, value))
           << "Invalid database state!";
       auto metadata = writer.WriteMetadata();
@@ -715,46 +822,67 @@ bool PropertyStore::SetProperty(PropertyId property,
       // to set a property to `Null` (we are trying to remove the property).
     }
   } else {
-    Reader reader(data_, size_);
+    Reader reader(data, size);
     auto info = FindProperty(&reader, property);
     existed = info.property_size != 0;
     auto new_size = info.all_size - info.property_size + property_size;
     auto new_size_to_power_of_8 = ToPowerOf8(new_size);
     if (new_size_to_power_of_8 == 0) {
       // We don't have any data to encode anymore.
-      delete[] data_;
-      data_ = nullptr;
-      size_ = 0;
-    } else if (new_size_to_power_of_8 > size_ ||
-               new_size_to_power_of_8 <= size_ * 2 / 3) {
+      if (!in_local_buffer) delete[] data;
+      SetSizeData(buffer_, 0, nullptr);
+      data = nullptr;
+      size = 0;
+    } else if (new_size_to_power_of_8 > size ||
+               new_size_to_power_of_8 <= size * 2 / 3) {
       // We need to enlarge/shrink the buffer.
-      auto buffer = new uint8_t[new_size_to_power_of_8];
+      bool current_in_local_buffer = false;
+      uint8_t *current_data = nullptr;
+      uint64_t current_size = 0;
+      if (new_size <= sizeof(buffer_) - 1) {
+        // Use the local buffer.
+        buffer_[0] = kUseLocalBuffer;
+        current_size = sizeof(buffer_) - 1;
+        current_data = &buffer_[1];
+        current_in_local_buffer = true;
+      } else {
+        // Allocate a new external buffer.
+        current_data = new uint8_t[new_size_to_power_of_8];
+        current_size = new_size_to_power_of_8;
+        current_in_local_buffer = false;
+      }
       // Copy everything before the property to the new buffer.
-      memcpy(buffer, data_, info.property_begin);
+      memmove(current_data, data, info.property_begin);
       // Copy everything after the property to the new buffer.
-      memcpy(buffer + info.property_begin + property_size,
-             data_ + info.property_end, info.all_end - info.property_end);
-      // Replace the current buffer with the new buffer.
-      delete[] data_;
-      data_ = buffer;
-      size_ = new_size_to_power_of_8;
+      memmove(current_data + info.property_begin + property_size,
+              data + info.property_end, info.all_end - info.property_end);
+      // Free the old buffer.
+      if (!in_local_buffer) delete[] data;
+      // Permanently remember the new buffer.
+      if (!current_in_local_buffer) {
+        SetSizeData(buffer_, current_size, current_data);
+      }
+      // Set the proxy variables.
+      data = current_data;
+      size = current_size;
+      in_local_buffer = current_in_local_buffer;
     } else if (property_size != info.property_size) {
       // We can keep the data in the same buffer, but the new property is
       // larger/smaller than the old property. We need to move the following
       // properties to the right/left.
-      memmove(data_ + info.property_begin + property_size,
-              data_ + info.property_end, info.all_end - info.property_end);
+      memmove(data + info.property_begin + property_size,
+              data + info.property_end, info.all_end - info.property_end);
     }
 
     if (!value.IsNull()) {
       // We need to encode the new value.
-      Writer writer(data_ + info.property_begin, property_size);
+      Writer writer(data + info.property_begin, property_size);
       CHECK(EncodeProperty(&writer, property, value))
           << "Invalid database state!";
     }
 
     // We need to recreate the tombstone (if possible).
-    Writer writer(data_ + new_size, size_ - new_size);
+    Writer writer(data + new_size, size - new_size);
     auto metadata = writer.WriteMetadata();
     if (metadata) {
       metadata->Set({Type::EMPTY});
@@ -765,10 +893,19 @@ bool PropertyStore::SetProperty(PropertyId property,
 }
 
 bool PropertyStore::ClearProperties() {
-  if (!data_) return false;
-  delete[] data_;
-  data_ = nullptr;
-  size_ = 0;
+  bool in_local_buffer = false;
+  uint64_t size;
+  uint8_t *data;
+  std::tie(size, data) = GetSizeData(buffer_);
+  if (size % 8 != 0) {
+    // We are storing the data in the local buffer.
+    size = sizeof(buffer_) - 1;
+    data = &buffer_[1];
+    in_local_buffer = true;
+  }
+  if (!size) return false;
+  if (!in_local_buffer) delete[] data;
+  SetSizeData(buffer_, 0, nullptr);
   return true;
 }
 
diff --git a/src/storage/v2/property_store.hpp b/src/storage/v2/property_store.hpp
index f60e337d7..afbebffcb 100644
--- a/src/storage/v2/property_store.hpp
+++ b/src/storage/v2/property_store.hpp
@@ -46,8 +46,7 @@ class PropertyStore {
   bool ClearProperties();
 
  private:
-  uint8_t *data_{nullptr};
-  uint64_t size_{0};
+  uint8_t buffer_[sizeof(uint64_t) + sizeof(uint8_t *)];
 };
 
 }  // namespace storage
diff --git a/tests/unit/storage_v2_property_store.cpp b/tests/unit/storage_v2_property_store.cpp
index 26a0c18e8..1da854b49 100644
--- a/tests/unit/storage_v2_property_store.cpp
+++ b/tests/unit/storage_v2_property_store.cpp
@@ -15,6 +15,26 @@ TEST(PropertyStore, Simple) {
   ASSERT_EQ(props.GetProperty(prop), value);
   ASSERT_TRUE(props.HasProperty(prop));
   ASSERT_THAT(props.Properties(), UnorderedElementsAre(std::pair(prop, value)));
+
+  ASSERT_FALSE(props.SetProperty(prop, storage::PropertyValue()));
+  ASSERT_TRUE(props.GetProperty(prop).IsNull());
+  ASSERT_FALSE(props.HasProperty(prop));
+  ASSERT_EQ(props.Properties().size(), 0);
+}
+
+TEST(PropertyStore, SimpleLarge) {
+  storage::PropertyStore props;
+  auto prop = storage::PropertyId::FromInt(42);
+  auto value = storage::PropertyValue(std::string(10000, 'a'));
+  ASSERT_TRUE(props.SetProperty(prop, value));
+  ASSERT_EQ(props.GetProperty(prop), value);
+  ASSERT_TRUE(props.HasProperty(prop));
+  ASSERT_THAT(props.Properties(), UnorderedElementsAre(std::pair(prop, value)));
+
+  ASSERT_FALSE(props.SetProperty(prop, storage::PropertyValue()));
+  ASSERT_TRUE(props.GetProperty(prop).IsNull());
+  ASSERT_FALSE(props.HasProperty(prop));
+  ASSERT_EQ(props.Properties().size(), 0);
 }
 
 TEST(PropertyStore, EmptySetToNull) {
@@ -26,6 +46,26 @@ TEST(PropertyStore, EmptySetToNull) {
   ASSERT_EQ(props.Properties().size(), 0);
 }
 
+TEST(PropertyStore, Clear) {
+  storage::PropertyStore props;
+  auto prop = storage::PropertyId::FromInt(42);
+  auto value = storage::PropertyValue(42);
+  ASSERT_TRUE(props.SetProperty(prop, value));
+  ASSERT_EQ(props.GetProperty(prop), value);
+  ASSERT_TRUE(props.HasProperty(prop));
+  ASSERT_THAT(props.Properties(), UnorderedElementsAre(std::pair(prop, value)));
+  ASSERT_TRUE(props.ClearProperties());
+  ASSERT_TRUE(props.GetProperty(prop).IsNull());
+  ASSERT_FALSE(props.HasProperty(prop));
+  ASSERT_EQ(props.Properties().size(), 0);
+}
+
+TEST(PropertyStore, EmptyClear) {
+  storage::PropertyStore props;
+  ASSERT_FALSE(props.ClearProperties());
+  ASSERT_EQ(props.Properties().size(), 0);
+}
+
 TEST(PropertyStore, MoveConstruct) {
   storage::PropertyStore props1;
   auto prop = storage::PropertyId::FromInt(42);
@@ -48,6 +88,28 @@ TEST(PropertyStore, MoveConstruct) {
   ASSERT_EQ(props1.Properties().size(), 0);
 }
 
+TEST(PropertyStore, MoveConstructLarge) {
+  storage::PropertyStore props1;
+  auto prop = storage::PropertyId::FromInt(42);
+  auto value = storage::PropertyValue(std::string(10000, 'a'));
+  ASSERT_TRUE(props1.SetProperty(prop, value));
+  ASSERT_EQ(props1.GetProperty(prop), value);
+  ASSERT_TRUE(props1.HasProperty(prop));
+  ASSERT_THAT(props1.Properties(),
+              UnorderedElementsAre(std::pair(prop, value)));
+  {
+    storage::PropertyStore props2(std::move(props1));
+    ASSERT_EQ(props2.GetProperty(prop), value);
+    ASSERT_TRUE(props2.HasProperty(prop));
+    ASSERT_THAT(props2.Properties(),
+                UnorderedElementsAre(std::pair(prop, value)));
+  }
+  // NOLINTNEXTLINE(bugprone-use-after-move,clang-analyzer-cplusplus.Move,hicpp-invalid-access-moved)
+  ASSERT_TRUE(props1.GetProperty(prop).IsNull());
+  ASSERT_FALSE(props1.HasProperty(prop));
+  ASSERT_EQ(props1.Properties().size(), 0);
+}
+
 TEST(PropertyStore, MoveAssign) {
   storage::PropertyStore props1;
   auto prop = storage::PropertyId::FromInt(42);
@@ -77,6 +139,35 @@ TEST(PropertyStore, MoveAssign) {
   ASSERT_EQ(props1.Properties().size(), 0);
 }
 
+TEST(PropertyStore, MoveAssignLarge) {
+  storage::PropertyStore props1;
+  auto prop = storage::PropertyId::FromInt(42);
+  auto value = storage::PropertyValue(std::string(10000, 'a'));
+  ASSERT_TRUE(props1.SetProperty(prop, value));
+  ASSERT_EQ(props1.GetProperty(prop), value);
+  ASSERT_TRUE(props1.HasProperty(prop));
+  ASSERT_THAT(props1.Properties(),
+              UnorderedElementsAre(std::pair(prop, value)));
+  {
+    auto value2 = storage::PropertyValue(std::string(10000, 'b'));
+    storage::PropertyStore props2;
+    ASSERT_TRUE(props2.SetProperty(prop, value2));
+    ASSERT_EQ(props2.GetProperty(prop), value2);
+    ASSERT_TRUE(props2.HasProperty(prop));
+    ASSERT_THAT(props2.Properties(),
+                UnorderedElementsAre(std::pair(prop, value2)));
+    props2 = std::move(props1);
+    ASSERT_EQ(props2.GetProperty(prop), value);
+    ASSERT_TRUE(props2.HasProperty(prop));
+    ASSERT_THAT(props2.Properties(),
+                UnorderedElementsAre(std::pair(prop, value)));
+  }
+  // NOLINTNEXTLINE(bugprone-use-after-move,clang-analyzer-cplusplus.Move,hicpp-invalid-access-moved)
+  ASSERT_TRUE(props1.GetProperty(prop).IsNull());
+  ASSERT_FALSE(props1.HasProperty(prop));
+  ASSERT_EQ(props1.Properties().size(), 0);
+}
+
 TEST(PropertyStore, EmptySet) {
   std::vector<storage::PropertyValue> vec{storage::PropertyValue(true),
                                           storage::PropertyValue(123),