Compare commits

...

5 Commits

Author SHA1 Message Date
Antonio Andelic
77c8a650fa Merge branch 'master' into T0515-MG-fixed-size-pool-resource 2021-10-28 12:58:04 +02:00
Antonio Andelic
e27485388d Use FixedSizePool 2021-06-17 11:39:30 +02:00
Antonio Andelic
51b5d81018 Merge branch 'master' into T0515-MG-fixed-size-pool-resource 2021-06-17 11:36:32 +02:00
Antonio Andelic
10cc0c01d2 Add tests for FixedSizePoolResource 2021-03-22 14:06:00 +01:00
Antonio Andelic
45cd78a435 Add FixedSizePoolResource PoC 2021-03-22 14:05:47 +01:00
5 changed files with 396 additions and 3 deletions

View File

@ -830,7 +830,7 @@ std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::Pull(AnyStream *strea
// Also, we want to throw only when the query engine requests more memory and not the storage
// so we add the exception to the allocator.
// TODO (mferencevic): Tune the parameters accordingly.
utils::PoolResource pool_memory(128, 1024, &monotonic_memory);
utils::FixedSizePoolResource pool_memory(128, 1024, &monotonic_memory);
std::optional<utils::LimitedMemoryResource> maybe_limited_resource;
if (memory_limit_) {

View File

@ -13,6 +13,7 @@
#include <algorithm>
#include <cmath>
#include <cstddef>
#include <cstdint>
#include <limits>
#include <type_traits>
@ -164,8 +165,9 @@ void *Pool::Allocate() {
--chunk->blocks_available;
return available_block;
};
if (last_alloc_chunk_ && last_alloc_chunk_->blocks_available > 0U)
if (last_alloc_chunk_ && last_alloc_chunk_->blocks_available > 0U) {
return allocate_block_from_chunk(last_alloc_chunk_);
}
// Find a Chunk with available memory.
for (auto &chunk : chunks_) {
if (chunk.blocks_available > 0U) {
@ -347,4 +349,141 @@ void PoolResource::Release() {
// PoolResource END
namespace impl {
namespace {
constexpr size_t kInitialChunkSize = 1;
} // namespace
FixedSizePool::FixedSizePool(const size_t block_size, const size_t max_blocks_per_chunk_,
utils::MemoryResource *upstream)
: block_size_{block_size},
max_blocks_per_chunk_{max_blocks_per_chunk_},
chunk_size_{kInitialChunkSize},
upstream_{upstream} {
internal_block_size_ = RoundUp(block_size, alignof(std::max_align_t));
}
void *FixedSizePool::Allocate() {
if (begin_ == end_) {
if (free_list_) {
Link *p = free_list_;
free_list_ = p->next;
return p;
}
Replenish();
}
auto *p = begin_;
begin_ += internal_block_size_;
return p;
}
void FixedSizePool::Deallocate(void *p) {
static_cast<Link *>(p)->next = free_list_;
free_list_ = static_cast<Link *>(p);
}
void FixedSizePool::Release() {
chunk_list_.Release(upstream_);
chunk_size_ = kInitialChunkSize;
begin_ = nullptr;
end_ = nullptr;
free_list_ = nullptr;
}
FixedSizePool::~FixedSizePool() { Release(); }
void FixedSizePool::Replenish() {
begin_ = static_cast<std::byte *>(chunk_list_.Allocate(chunk_size_ * internal_block_size_, upstream_));
end_ = begin_ + chunk_size_ * internal_block_size_;
// TODO (antonio2368): Add more growth strategies
const auto next_chunk_size = chunk_size_ * 2;
if (next_chunk_size <= max_blocks_per_chunk_) {
chunk_size_ = next_chunk_size;
}
}
} // namespace impl
namespace {
inline uint64_t CeilLog2(const uint64_t x) { return utils::Log2(x) + !utils::IsPow2(x); }
} // namespace
FixedSizePoolResource::FixedSizePoolResource(const size_t max_blocks_per_chunk, const size_t max_block_size,
utils::MemoryResource *upstream)
: upstream_{upstream} {
max_block_size_ = min_block_size_;
num_pools_ = CeilLog2(max_block_size) - CeilLog2(min_block_size_) + 1;
MG_ASSERT(num_pools_ > 0, "Invalid max size of the block.");
auto allocator = Allocator<impl::FixedSizePool>(upstream_);
pools_ = allocator.allocate(num_pools_);
for (size_t i = 0; i < num_pools_; ++i) {
allocator.construct(pools_ + i, max_block_size_, max_blocks_per_chunk, upstream);
MG_ASSERT(max_block_size_ <= std::numeric_limits<size_t>::max() / 2);
max_block_size_ *= 2;
}
max_block_size_ /= 2;
}
void *FixedSizePoolResource::DoAllocate(const size_t bytes, const size_t alignment) {
MG_ASSERT(bytes > 0);
void *ret{nullptr};
if (bytes <= max_block_size_) {
auto *pool = FindPool(bytes);
ret = pool->Allocate();
} else {
// add to the list of allocated chunks
ret = large_blocks_.Allocate(bytes, upstream_);
}
return ret;
}
void FixedSizePoolResource::DoDeallocate(void *p, const size_t bytes, const size_t alignment) {
auto *pool = FindPool(bytes);
if (pool == nullptr) {
large_blocks_.Deallocate(p, upstream_);
} else {
pool->Deallocate(p);
}
}
impl::FixedSizePool *FixedSizePoolResource::FindPool(const size_t size) const {
if (size > max_block_size_) {
return nullptr;
}
auto *pool = pools_;
while (size > pool->BlockSize()) {
++pool;
}
return pool;
}
void FixedSizePoolResource::Release() {
for (size_t i = 0; i < num_pools_; ++i) {
pools_[i].Release();
}
large_blocks_.Release(upstream_);
}
FixedSizePoolResource::~FixedSizePoolResource() {
Release();
for (size_t i = 0; i < num_pools_; ++i) {
pools_[i].~FixedSizePool();
}
upstream_->Deallocate(pools_, sizeof(impl::FixedSizePool) * num_pools_, alignof(impl::FixedSizePool));
}
} // namespace utils

View File

@ -16,6 +16,7 @@
#pragma once
#include <cstddef>
#include <list>
#include <memory>
#include <mutex>
#include <new>
@ -23,6 +24,9 @@
#include <type_traits>
#include <utility>
#include <vector>
#ifndef NDEBUG
#include <iostream>
#endif
// Although <memory_resource> is in C++17, gcc libstdc++ still needs to
// implement it fully. It should be available in the next major release
// version, i.e. gcc 9.x.
@ -564,6 +568,217 @@ class LimitedMemoryResource final : public utils::MemoryResource {
bool DoIsEqual(const MemoryResource &other) const noexcept override { return this == &other; }
};
namespace impl {
// https://stackoverflow.com/a/9194117
// if y is power 2 we can do (x + y - 1) & ~(y - 1);
// https://stackoverflow.com/a/14561794
constexpr size_t RoundUp(const size_t x, const size_t y) {
MG_ASSERT(y >= 1);
return (x + y - 1) / y * y;
}
struct LinkedListNode {
LinkedListNode *next;
LinkedListNode *previous;
};
struct BlockLinkedListHeaderBase : public LinkedListNode {
size_t size;
using NodeType = LinkedListNode;
};
struct SingleLinkedListNode {
SingleLinkedListNode *next;
};
struct BlockSingleLinkedListHeaderBase : public SingleLinkedListNode {
size_t size;
using NodeType = SingleLinkedListNode;
};
template <typename T>
concept BlockLinkedListHeader = std::is_base_of_v<BlockLinkedListHeaderBase, T>;
template <typename T>
concept BlockSingleLinkedListHeader = std::is_base_of_v<BlockSingleLinkedListHeaderBase, T>;
template <typename T>
concept BlockListHeader = BlockLinkedListHeader<T> || BlockSingleLinkedListHeader<T>;
template <BlockListHeader THeader>
class BlockList {
using NodeType = typename THeader::NodeType;
public:
static const size_t header_size = RoundUp(sizeof(THeader), alignof(std::max_align_t));
explicit BlockList() {
// create a circular linked list
list_.next = &list_;
if constexpr (BlockLinkedListHeader<THeader>) {
list_.previous = &list_;
}
}
void *Allocate(const size_t bytes, utils::MemoryResource *rsrc) {
if (bytes > static_cast<size_t>(-1) - header_size) {
throw utils::BadAlloc("Requested too many bytes to allocate");
}
void *p = rsrc->Allocate(bytes + header_size);
auto *node = new (p) THeader;
node->size = bytes + header_size;
auto *next = list_.next;
node->next = next;
list_.next = node;
if constexpr (BlockLinkedListHeader<THeader>) {
node->previous = &list_;
next->previous = node;
}
return static_cast<std::byte *>(p) + header_size;
}
template <typename T = THeader>
requires BlockLinkedListHeader<T> void Deallocate(void *p, utils::MemoryResource *rsrc) {
auto *header = static_cast<THeader *>(static_cast<void *>(static_cast<std::byte *>(p) - header_size));
auto *next = header->next;
auto *previous = header->previous;
previous->next = next;
next->previous = previous;
const size_t size = header->size;
header->~THeader();
rsrc->Deallocate(header, size, alignof(std::max_align_t));
}
void Release(utils::MemoryResource *rsrc) {
NodeType *node = list_.next;
while (node != &list_) {
auto *header = static_cast<THeader *>(node);
node = node->next;
const size_t size = header->size;
header->~THeader();
rsrc->Deallocate(header, size, alignof(std::max_align_t));
}
// create a circular linked list
list_.next = &list_;
if constexpr (BlockLinkedListHeader<THeader>) {
list_.previous = &list_;
}
}
private:
NodeType list_;
};
// https://github.com/bloomberg/bde/blob/master/groups/bdl/bdlma/bdlma_pool.h
class FixedSizePool final {
public:
FixedSizePool(size_t block_size, size_t max_blocks_per_chunk, utils::MemoryResource *upstream);
FixedSizePool(const FixedSizePool &) = delete;
FixedSizePool &operator=(const FixedSizePool &) = delete;
FixedSizePool(FixedSizePool &&) = default;
FixedSizePool &operator=(FixedSizePool &&) = default;
void *Allocate();
void Deallocate(void *p);
void Release();
auto BlockSize() const { return block_size_; }
~FixedSizePool();
private:
struct Link {
Link *next;
};
std::byte *begin_{nullptr};
std::byte *end_{nullptr};
Link *free_list_{nullptr};
size_t internal_block_size_;
size_t block_size_;
size_t max_blocks_per_chunk_;
size_t chunk_size_;
BlockList<BlockSingleLinkedListHeaderBase> chunk_list_;
utils::MemoryResource *upstream_;
void Replenish();
};
} // namespace impl
// https://github.com/bloomberg/bde/blob/master/groups/bdl/bdlma/bdlma_multipool.h
class FixedSizePoolResource : public MemoryResource {
public:
explicit FixedSizePoolResource(size_t max_blocks_per_chunk, size_t max_block_size,
utils::MemoryResource *upstream = utils::NewDeleteResource());
void Release();
FixedSizePoolResource(const FixedSizePoolResource &) = delete;
FixedSizePoolResource &operator=(const FixedSizePoolResource &) = delete;
FixedSizePoolResource(FixedSizePoolResource &&) = default;
FixedSizePoolResource &operator=(FixedSizePoolResource &&) = default;
~FixedSizePoolResource() override;
private:
impl::FixedSizePool *FindPool(size_t size) const;
void *DoAllocate(size_t bytes, size_t alignment) override;
void DoDeallocate(void *p, size_t bytes, size_t alignment) override;
bool DoIsEqual(const MemoryResource &other) const noexcept override { return this == &other; }
impl::FixedSizePool *pools_{nullptr};
size_t max_block_size_;
size_t min_block_size_{8};
size_t num_pools_{0};
impl::BlockList<impl::BlockLinkedListHeaderBase> large_blocks_;
utils::MemoryResource *upstream_;
};
#ifndef NDEBUG
class PrintMemoryResource : public MemoryResource {
public:
explicit PrintMemoryResource(utils::MemoryResource *memory) : memory_(memory) {}
private:
void *DoAllocate(size_t bytes, size_t alignment) override {
std::cout << "Allocating " << bytes << std::endl;
return memory_->Allocate(bytes, alignment);
}
void DoDeallocate(void *p, size_t bytes, size_t alignment) override {
std::cout << "Deallocating " << bytes << std::endl;
return memory_->Deallocate(p, bytes, alignment);
}
bool DoIsEqual(const utils::MemoryResource &other) const noexcept override { return memory_->IsEqual(other); }
utils::MemoryResource *memory_;
};
#endif
// Allocate memory with the OutOfMemoryException enabled if the requested size
// puts total allocated amount over the limit.
class ResourceWithOutOfMemoryException : public MemoryResource {

View File

@ -11,7 +11,6 @@
#include <gflags/gflags.h>
#include <mgclient.hpp>
#include <algorithm>
#include "utils/logging.hpp"
#include "utils/timer.hpp"

View File

@ -321,6 +321,46 @@ TEST(PoolResource, BlockDeallocation) {
EXPECT_EQ(test_mem.new_count_, 0U);
}
// NOLINTNEXTLINE(hicpp-special-member-functions)
TEST(FixedSizePoolResource, SmallBlockAllocations) {
TestMemory test_mem;
const size_t max_blocks_per_chunk = 3U;
const size_t max_block_size = 1024U;
utils::FixedSizePoolResource mem(max_blocks_per_chunk, max_block_size, &test_mem);
std::array<void *, max_block_size> blocks;
for (size_t i = 0; i < max_block_size; ++i) {
blocks[i] = CheckAllocation(&mem, i + 1);
}
for (size_t i = 0; i < max_block_size; ++i) {
mem.Deallocate(blocks[i], i + 1);
}
mem.Release();
CheckAllocation(&mem, 64U);
}
// NOLINTNEXTLINE(hicpp-special-member-functions)
TEST(FixedSizePoolResource, BigBlockAllocations) {
TestMemory test_mem;
const size_t max_blocks_per_chunk = 3U;
const size_t max_block_size = 1024U;
utils::FixedSizePoolResource mem(max_blocks_per_chunk, max_block_size, &test_mem);
constexpr size_t block_num = 10;
std::array<void *, block_num> blocks;
for (size_t i = 0; i < block_num; ++i) {
blocks[i] = CheckAllocation(&mem, max_block_size * (i + 1));
}
for (size_t i = 0; i < block_num; ++i) {
mem.Deallocate(blocks[i], max_block_size * (i + 1));
}
mem.Release();
CheckAllocation(&mem, max_block_size * block_num);
}
class AllocationTrackingMemory final : public utils::MemoryResource {
public:
std::vector<size_t> allocated_sizes_;