Implement local buffer for PropertyStore

Reviewers: teon.banek, ipaljak

Reviewed By: teon.banek

Subscribers: pullbot

Differential Revision:
This commit is contained in:
Matej Ferencevic 2019-12-23 13:58:36 +01:00
parent b3d1cd8257
commit b5e255b896
3 changed files with 273 additions and 46 deletions

View File

@ -3,6 +3,7 @@
#include <cstring>
#include <limits>
#include <optional>
#include <tuple>
#include <type_traits>
#include <utility>
@ -628,38 +629,102 @@ uint64_t ToPowerOf8(uint64_t size) {
return size - mod + 8;
// The `PropertyStore` also uses a small buffer optimization in it. If the data
// fits into the size of the internally stored pointer and size, then the
// pointer and size are used as a in-place buffer. In order to be able to do
// this we store a `union` of the two sets of data. Because the storage is a
// `union`, only one set of information (pointer+size or buffer) can be used at
// any time. The buffer perfectly overlaps with the memory locations of the
// pointer+size. This is illustrated in the following diagram:
// Memory (hex):
// 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00
// |---------------------| -> size
// |---------------------| -> data
// 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 -> buffer_ (positions)
// When we are using the pointer+size we know that the size must
// be a multiple of 8 (because we always allocate a buffer whose size is a
// multiple of 8). That means that the lower 3 bits of the `size` field must be
// zero when the data is used as a pointer+size.
// Because this architecture is little-endian, we know that `buffer_[0]` will be
// aligned with the lowest byte of the `size` field. When we use the inline
// `buffer_` we write `kUseLocalBuffer` (which is exactly 1) to `buffer_[0]`
// which will make the `size` read (independent of the other values in the
// buffer) always not be a multiple of 8. We use that fact to distinguish which
// of the two sets of data is currently active. Because the first byte of the
// buffer is used to distinguish which of the two sets of data is used, we can
// only use the leftover 15 bytes for raw data storage.
#ifndef __x86_64__
#error The PropertyStore only supports x86_64
const uint8_t kUseLocalBuffer = 0x01;
// Helper functions used to retrieve/store `size` and `data` from/into the
// `buffer_`.
std::pair<uint64_t, uint8_t *> GetSizeData(const uint8_t *buffer) {
uint64_t size;
uint8_t *data;
memcpy(&size, buffer, sizeof(uint64_t));
memcpy(&data, buffer + sizeof(uint64_t), sizeof(uint8_t *));
return {size, data};
void SetSizeData(uint8_t *buffer, uint64_t size, uint8_t *data) {
memcpy(buffer, &size, sizeof(uint64_t));
memcpy(buffer + sizeof(uint64_t), &data, sizeof(uint8_t *));
} // namespace
PropertyStore::PropertyStore() {}
PropertyStore::PropertyStore() { memset(buffer_, 0, sizeof(buffer_)); }
PropertyStore::PropertyStore(PropertyStore &&other) noexcept
: data_(other.data_), size_(other.size_) {
other.data_ = nullptr;
other.size_ = 0;
PropertyStore::PropertyStore(PropertyStore &&other) noexcept {
memcpy(buffer_, other.buffer_, sizeof(buffer_));
memset(other.buffer_, 0, sizeof(other.buffer_));
PropertyStore &PropertyStore::operator=(PropertyStore &&other) noexcept {
delete[] data_;
uint64_t size;
uint8_t *data;
std::tie(size, data) = GetSizeData(buffer_);
if (size % 8 == 0) {
// We are storing the data in an external buffer.
delete[] data;
data_ = other.data_;
size_ = other.size_;
other.data_ = nullptr;
other.size_ = 0;
memcpy(buffer_, other.buffer_, sizeof(buffer_));
memset(other.buffer_, 0, sizeof(other.buffer_));
return *this;
PropertyStore::~PropertyStore() {
delete[] data_;
size_ = 0;
uint64_t size;
uint8_t *data;
std::tie(size, data) = GetSizeData(buffer_);
if (size % 8 == 0) {
// We are storing the data in an external buffer.
delete[] data;
PropertyValue PropertyStore::GetProperty(PropertyId property) const {
Reader reader(data_, size_);
uint64_t size;
const uint8_t *data;
std::tie(size, data) = GetSizeData(buffer_);
if (size % 8 != 0) {
// We are storing the data in the local buffer.
size = sizeof(buffer_) - 1;
data = &buffer_[1];
Reader reader(data, size);
auto info = FindProperty(&reader, property);
if (info.property_size == 0) return PropertyValue();
Reader prop_reader(data_ + info.property_begin, info.property_size);
Reader prop_reader(data + info.property_begin, info.property_size);
auto prop = DecodeProperty(&prop_reader);
CHECK(prop) << "Invalid database state!";
CHECK(prop->first == property) << "Invalid database state!";
@ -667,13 +732,29 @@ PropertyValue PropertyStore::GetProperty(PropertyId property) const {
bool PropertyStore::HasProperty(PropertyId property) const {
Reader reader(data_, size_);
uint64_t size;
const uint8_t *data;
std::tie(size, data) = GetSizeData(buffer_);
if (size % 8 != 0) {
// We are storing the data in the local buffer.
size = sizeof(buffer_) - 1;
data = &buffer_[1];
Reader reader(data, size);
auto info = FindProperty(&reader, property);
return info.property_size != 0;
std::map<PropertyId, PropertyValue> PropertyStore::Properties() const {
Reader reader(data_, size_);
uint64_t size;
const uint8_t *data;
std::tie(size, data) = GetSizeData(buffer_);
if (size % 8 != 0) {
// We are storing the data in the local buffer.
size = sizeof(buffer_) - 1;
data = &buffer_[1];
Reader reader(data, size);
std::map<PropertyId, PropertyValue> props;
while (true) {
auto ret = DecodeProperty(&reader);
@ -692,16 +773,42 @@ bool PropertyStore::SetProperty(PropertyId property,
property_size = writer.Written();
bool in_local_buffer = false;
uint64_t size;
uint8_t *data;
std::tie(size, data) = GetSizeData(buffer_);
if (size % 8 != 0) {
// We are storing the data in the local buffer.
size = sizeof(buffer_) - 1;
data = &buffer_[1];
in_local_buffer = true;
bool existed = false;
if (!data_) {
if (!size) {
if (!value.IsNull()) {
// We don't have a data buffer. Allocate a new one.
auto size = ToPowerOf8(property_size);
data_ = new uint8_t[size];
size_ = size;
auto property_size_to_power_of_8 = ToPowerOf8(property_size);
if (property_size <= sizeof(buffer_) - 1) {
// Use the local buffer.
buffer_[0] = kUseLocalBuffer;
size = sizeof(buffer_) - 1;
data = &buffer_[1];
in_local_buffer = true;
} else {
// Allocate a new external buffer.
auto alloc_data = new uint8_t[property_size_to_power_of_8];
auto alloc_size = property_size_to_power_of_8;
SetSizeData(buffer_, alloc_size, alloc_data);
size = alloc_size;
data = alloc_data;
in_local_buffer = false;
// Encode the property into the data buffer.
Writer writer(data_, size_);
Writer writer(data, size);
CHECK(EncodeProperty(&writer, property, value))
<< "Invalid database state!";
auto metadata = writer.WriteMetadata();
@ -715,46 +822,67 @@ bool PropertyStore::SetProperty(PropertyId property,
// to set a property to `Null` (we are trying to remove the property).
} else {
Reader reader(data_, size_);
Reader reader(data, size);
auto info = FindProperty(&reader, property);
existed = info.property_size != 0;
auto new_size = info.all_size - info.property_size + property_size;
auto new_size_to_power_of_8 = ToPowerOf8(new_size);
if (new_size_to_power_of_8 == 0) {
// We don't have any data to encode anymore.
delete[] data_;
data_ = nullptr;
size_ = 0;
} else if (new_size_to_power_of_8 > size_ ||
new_size_to_power_of_8 <= size_ * 2 / 3) {
if (!in_local_buffer) delete[] data;
SetSizeData(buffer_, 0, nullptr);
data = nullptr;
size = 0;
} else if (new_size_to_power_of_8 > size ||
new_size_to_power_of_8 <= size * 2 / 3) {
// We need to enlarge/shrink the buffer.
auto buffer = new uint8_t[new_size_to_power_of_8];
bool current_in_local_buffer = false;
uint8_t *current_data = nullptr;
uint64_t current_size = 0;
if (new_size <= sizeof(buffer_) - 1) {
// Use the local buffer.
buffer_[0] = kUseLocalBuffer;
current_size = sizeof(buffer_) - 1;
current_data = &buffer_[1];
current_in_local_buffer = true;
} else {
// Allocate a new external buffer.
current_data = new uint8_t[new_size_to_power_of_8];
current_size = new_size_to_power_of_8;
current_in_local_buffer = false;
// Copy everything before the property to the new buffer.
memcpy(buffer, data_, info.property_begin);
memmove(current_data, data, info.property_begin);
// Copy everything after the property to the new buffer.
memcpy(buffer + info.property_begin + property_size,
data_ + info.property_end, info.all_end - info.property_end);
// Replace the current buffer with the new buffer.
delete[] data_;
data_ = buffer;
size_ = new_size_to_power_of_8;
memmove(current_data + info.property_begin + property_size,
data + info.property_end, info.all_end - info.property_end);
// Free the old buffer.
if (!in_local_buffer) delete[] data;
// Permanently remember the new buffer.
if (!current_in_local_buffer) {
SetSizeData(buffer_, current_size, current_data);
// Set the proxy variables.
data = current_data;
size = current_size;
in_local_buffer = current_in_local_buffer;
} else if (property_size != info.property_size) {
// We can keep the data in the same buffer, but the new property is
// larger/smaller than the old property. We need to move the following
// properties to the right/left.
memmove(data_ + info.property_begin + property_size,
data_ + info.property_end, info.all_end - info.property_end);
memmove(data + info.property_begin + property_size,
data + info.property_end, info.all_end - info.property_end);
if (!value.IsNull()) {
// We need to encode the new value.
Writer writer(data_ + info.property_begin, property_size);
Writer writer(data + info.property_begin, property_size);
CHECK(EncodeProperty(&writer, property, value))
<< "Invalid database state!";
// We need to recreate the tombstone (if possible).
Writer writer(data_ + new_size, size_ - new_size);
Writer writer(data + new_size, size - new_size);
auto metadata = writer.WriteMetadata();
if (metadata) {
@ -765,10 +893,19 @@ bool PropertyStore::SetProperty(PropertyId property,
bool PropertyStore::ClearProperties() {
if (!data_) return false;
delete[] data_;
data_ = nullptr;
size_ = 0;
bool in_local_buffer = false;
uint64_t size;
uint8_t *data;
std::tie(size, data) = GetSizeData(buffer_);
if (size % 8 != 0) {
// We are storing the data in the local buffer.
size = sizeof(buffer_) - 1;
data = &buffer_[1];
in_local_buffer = true;
if (!size) return false;
if (!in_local_buffer) delete[] data;
SetSizeData(buffer_, 0, nullptr);
return true;

View File

@ -46,8 +46,7 @@ class PropertyStore {
bool ClearProperties();
uint8_t *data_{nullptr};
uint64_t size_{0};
uint8_t buffer_[sizeof(uint64_t) + sizeof(uint8_t *)];
} // namespace storage

View File

@ -15,6 +15,26 @@ TEST(PropertyStore, Simple) {
ASSERT_EQ(props.GetProperty(prop), value);
ASSERT_THAT(props.Properties(), UnorderedElementsAre(std::pair(prop, value)));
ASSERT_FALSE(props.SetProperty(prop, storage::PropertyValue()));
ASSERT_EQ(props.Properties().size(), 0);
TEST(PropertyStore, SimpleLarge) {
storage::PropertyStore props;
auto prop = storage::PropertyId::FromInt(42);
auto value = storage::PropertyValue(std::string(10000, 'a'));
ASSERT_TRUE(props.SetProperty(prop, value));
ASSERT_EQ(props.GetProperty(prop), value);
ASSERT_THAT(props.Properties(), UnorderedElementsAre(std::pair(prop, value)));
ASSERT_FALSE(props.SetProperty(prop, storage::PropertyValue()));
ASSERT_EQ(props.Properties().size(), 0);
TEST(PropertyStore, EmptySetToNull) {
@ -26,6 +46,26 @@ TEST(PropertyStore, EmptySetToNull) {
ASSERT_EQ(props.Properties().size(), 0);
TEST(PropertyStore, Clear) {
storage::PropertyStore props;
auto prop = storage::PropertyId::FromInt(42);
auto value = storage::PropertyValue(42);
ASSERT_TRUE(props.SetProperty(prop, value));
ASSERT_EQ(props.GetProperty(prop), value);
ASSERT_THAT(props.Properties(), UnorderedElementsAre(std::pair(prop, value)));
ASSERT_EQ(props.Properties().size(), 0);
TEST(PropertyStore, EmptyClear) {
storage::PropertyStore props;
ASSERT_EQ(props.Properties().size(), 0);
TEST(PropertyStore, MoveConstruct) {
storage::PropertyStore props1;
auto prop = storage::PropertyId::FromInt(42);
@ -48,6 +88,28 @@ TEST(PropertyStore, MoveConstruct) {
ASSERT_EQ(props1.Properties().size(), 0);
TEST(PropertyStore, MoveConstructLarge) {
storage::PropertyStore props1;
auto prop = storage::PropertyId::FromInt(42);
auto value = storage::PropertyValue(std::string(10000, 'a'));
ASSERT_TRUE(props1.SetProperty(prop, value));
ASSERT_EQ(props1.GetProperty(prop), value);
UnorderedElementsAre(std::pair(prop, value)));
storage::PropertyStore props2(std::move(props1));
ASSERT_EQ(props2.GetProperty(prop), value);
UnorderedElementsAre(std::pair(prop, value)));
// NOLINTNEXTLINE(bugprone-use-after-move,clang-analyzer-cplusplus.Move,hicpp-invalid-access-moved)
ASSERT_EQ(props1.Properties().size(), 0);
TEST(PropertyStore, MoveAssign) {
storage::PropertyStore props1;
auto prop = storage::PropertyId::FromInt(42);
@ -77,6 +139,35 @@ TEST(PropertyStore, MoveAssign) {
ASSERT_EQ(props1.Properties().size(), 0);
TEST(PropertyStore, MoveAssignLarge) {
storage::PropertyStore props1;
auto prop = storage::PropertyId::FromInt(42);
auto value = storage::PropertyValue(std::string(10000, 'a'));
ASSERT_TRUE(props1.SetProperty(prop, value));
ASSERT_EQ(props1.GetProperty(prop), value);
UnorderedElementsAre(std::pair(prop, value)));
auto value2 = storage::PropertyValue(std::string(10000, 'b'));
storage::PropertyStore props2;
ASSERT_TRUE(props2.SetProperty(prop, value2));
ASSERT_EQ(props2.GetProperty(prop), value2);
UnorderedElementsAre(std::pair(prop, value2)));
props2 = std::move(props1);
ASSERT_EQ(props2.GetProperty(prop), value);
UnorderedElementsAre(std::pair(prop, value)));
// NOLINTNEXTLINE(bugprone-use-after-move,clang-analyzer-cplusplus.Move,hicpp-invalid-access-moved)
ASSERT_EQ(props1.Properties().size(), 0);
TEST(PropertyStore, EmptySet) {
std::vector<storage::PropertyValue> vec{storage::PropertyValue(true),