Use stack for initial memory of MonotonicBufferResource in Pull
Reviewers: mtomic, mferencevic Reviewed By: mtomic Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D2139
This commit is contained in:
parent
e9dcb1dcf2
commit
775930ef4e
@ -95,8 +95,6 @@ class Interpreter {
|
||||
plan_(plan),
|
||||
execution_memory_(std::make_unique<utils::MonotonicBufferResource>(
|
||||
kExecutionMemoryBlockSize)),
|
||||
per_pull_memory_(std::make_unique<utils::MonotonicBufferResource>(
|
||||
kExecutionMemoryBlockSize)),
|
||||
cursor_(
|
||||
plan_->plan().MakeCursor(db_accessor, execution_memory_.get())),
|
||||
frame_(plan_->symbol_table().max_position(), execution_memory_.get()),
|
||||
@ -107,7 +105,6 @@ class Interpreter {
|
||||
should_abort_query_(should_abort_query) {
|
||||
ctx_.is_profile_query = is_profile_query;
|
||||
ctx_.symbol_table = plan_->symbol_table();
|
||||
ctx_.evaluation_context.memory = per_pull_memory_.get();
|
||||
ctx_.evaluation_context.timestamp =
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
std::chrono::system_clock::now().time_since_epoch())
|
||||
@ -138,9 +135,17 @@ class Interpreter {
|
||||
template <typename TStream>
|
||||
bool Pull(TStream &stream) {
|
||||
utils::Timer timer;
|
||||
per_pull_memory_->Release();
|
||||
// Setup temporary memory for a single Pull. Initial memory should come
|
||||
// from stack, 256 KiB should fit on the stack and should be more than
|
||||
// enough for a single Pull.
|
||||
constexpr size_t stack_size = 256 * 1024;
|
||||
char stack_data[stack_size];
|
||||
utils::MonotonicBufferResource memory(&stack_data[0], stack_size);
|
||||
ctx_.evaluation_context.memory = &memory;
|
||||
// We can now Pull a result.
|
||||
bool return_value = cursor_->Pull(frame_, ctx_);
|
||||
if (return_value && !output_symbols_.empty()) {
|
||||
// TODO: The streamed values should also probably use the above memory.
|
||||
std::vector<TypedValue> values;
|
||||
values.reserve(output_symbols_.size());
|
||||
for (const auto &symbol : output_symbols_) {
|
||||
@ -183,11 +188,9 @@ class Interpreter {
|
||||
private:
|
||||
ExecutionContext ctx_;
|
||||
std::shared_ptr<CachedPlan> plan_;
|
||||
// execution_memory_ and per_pull_memory_ are unique_ptr, because we are
|
||||
// passing the address to cursor_, and we want to preserve the pointer in
|
||||
// case we get moved.
|
||||
// execution_memory_ is unique_ptr, because we are passing the address to
|
||||
// cursor_, and we want to preserve the pointer in case we get moved.
|
||||
std::unique_ptr<utils::MonotonicBufferResource> execution_memory_;
|
||||
std::unique_ptr<utils::MonotonicBufferResource> per_pull_memory_;
|
||||
query::plan::UniqueCursorPtr cursor_;
|
||||
Frame frame_;
|
||||
std::vector<Symbol> output_symbols_;
|
||||
|
@ -41,10 +41,21 @@ MonotonicBufferResource::MonotonicBufferResource(size_t initial_size,
|
||||
MemoryResource *memory)
|
||||
: memory_(memory), initial_size_(initial_size) {}
|
||||
|
||||
MonotonicBufferResource::MonotonicBufferResource(void *buffer,
|
||||
size_t buffer_size,
|
||||
MemoryResource *memory)
|
||||
: memory_(memory),
|
||||
initial_buffer_(buffer),
|
||||
initial_size_(buffer_size),
|
||||
next_buffer_size_(GrowMonotonicBuffer(
|
||||
initial_size_, std::numeric_limits<size_t>::max() - sizeof(Buffer))) {
|
||||
}
|
||||
|
||||
MonotonicBufferResource::MonotonicBufferResource(
|
||||
MonotonicBufferResource &&other) noexcept
|
||||
: memory_(other.memory_),
|
||||
current_buffer_(other.current_buffer_),
|
||||
initial_buffer_(other.initial_buffer_),
|
||||
initial_size_(other.initial_size_),
|
||||
allocated_(other.allocated_) {
|
||||
other.current_buffer_ = nullptr;
|
||||
@ -56,6 +67,7 @@ MonotonicBufferResource &MonotonicBufferResource::operator=(
|
||||
Release();
|
||||
memory_ = other.memory_;
|
||||
current_buffer_ = other.current_buffer_;
|
||||
initial_buffer_ = other.initial_buffer_;
|
||||
initial_size_ = other.initial_size_;
|
||||
allocated_ = other.allocated_;
|
||||
other.current_buffer_ = nullptr;
|
||||
@ -96,14 +108,26 @@ void *MonotonicBufferResource::DoAllocate(size_t bytes, size_t alignment) {
|
||||
allocated_ = 0;
|
||||
};
|
||||
|
||||
if (!current_buffer_) push_current_buffer(initial_size_);
|
||||
char *buffer_head = current_buffer_->data() + allocated_;
|
||||
char *data = nullptr;
|
||||
size_t data_capacity = 0U;
|
||||
if (current_buffer_) {
|
||||
data = current_buffer_->data();
|
||||
data_capacity = current_buffer_->capacity;
|
||||
} else if (initial_buffer_) {
|
||||
data = reinterpret_cast<char *>(initial_buffer_);
|
||||
data_capacity = initial_size_;
|
||||
} else { // missing current_buffer_ and initial_buffer_
|
||||
push_current_buffer(initial_size_);
|
||||
data = current_buffer_->data();
|
||||
data_capacity = current_buffer_->capacity;
|
||||
}
|
||||
char *buffer_head = data + allocated_;
|
||||
void *aligned_ptr = buffer_head;
|
||||
size_t available = current_buffer_->capacity - allocated_;
|
||||
size_t available = data_capacity - allocated_;
|
||||
if (!std::align(alignment, bytes, aligned_ptr, available)) {
|
||||
// Not enough memory, so allocate a new block with aligned data.
|
||||
push_current_buffer(next_buffer_size_);
|
||||
aligned_ptr = buffer_head = current_buffer_->data();
|
||||
aligned_ptr = buffer_head = data = current_buffer_->data();
|
||||
next_buffer_size_ = GrowMonotonicBuffer(
|
||||
next_buffer_size_, std::numeric_limits<size_t>::max() - sizeof(Buffer));
|
||||
}
|
||||
@ -111,8 +135,7 @@ void *MonotonicBufferResource::DoAllocate(size_t bytes, size_t alignment) {
|
||||
throw BadAlloc("Allocation alignment overflow");
|
||||
if (reinterpret_cast<char *>(aligned_ptr) + bytes <= aligned_ptr)
|
||||
throw BadAlloc("Allocation size overflow");
|
||||
allocated_ =
|
||||
reinterpret_cast<char *>(aligned_ptr) - current_buffer_->data() + bytes;
|
||||
allocated_ = reinterpret_cast<char *>(aligned_ptr) - data + bytes;
|
||||
return aligned_ptr;
|
||||
}
|
||||
|
||||
|
@ -274,6 +274,16 @@ class MonotonicBufferResource final : public MemoryResource {
|
||||
/// use the `memory` as the upstream resource.
|
||||
MonotonicBufferResource(size_t initial_size, MemoryResource *memory);
|
||||
|
||||
/// Construct the resource with the initial buffer set to `buffer` of given
|
||||
/// `buffer_size`.
|
||||
///
|
||||
/// This memory resource does not take ownership of `buffer` and will
|
||||
/// therefore not be freed in the destructor nor `Release` call. The `Release`
|
||||
/// call will indeed setup the `buffer` for reuse. Additional buffers are
|
||||
/// allocated from the given upstream `memory` resource.
|
||||
MonotonicBufferResource(void *buffer, size_t buffer_size,
|
||||
MemoryResource *memory = utils::NewDeleteResource());
|
||||
|
||||
MonotonicBufferResource(const MonotonicBufferResource &) = delete;
|
||||
MonotonicBufferResource &operator=(const MonotonicBufferResource &) = delete;
|
||||
|
||||
@ -300,6 +310,7 @@ class MonotonicBufferResource final : public MemoryResource {
|
||||
|
||||
MemoryResource *memory_{NewDeleteResource()};
|
||||
Buffer *current_buffer_{nullptr};
|
||||
void *initial_buffer_{nullptr};
|
||||
size_t initial_size_{0U};
|
||||
size_t next_buffer_size_{initial_size_};
|
||||
size_t allocated_{0U};
|
||||
|
@ -147,6 +147,45 @@ TEST(MonotonicBufferResource, AllocationWithSizeOverflow) {
|
||||
EXPECT_THROW(mem.Allocate(max_size, 4), std::bad_alloc);
|
||||
}
|
||||
|
||||
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
||||
TEST(MonotonicBufferResource, AllocationWithInitialBufferOnStack) {
|
||||
TestMemory test_mem;
|
||||
constexpr size_t stack_data_size = 1024;
|
||||
char stack_data[stack_data_size];
|
||||
memset(stack_data, 0x42, stack_data_size);
|
||||
utils::MonotonicBufferResource mem(&stack_data[0], stack_data_size,
|
||||
&test_mem);
|
||||
{
|
||||
char *ptr = reinterpret_cast<char *>(CheckAllocation(&mem, 1, 1));
|
||||
EXPECT_EQ(&stack_data[0], ptr);
|
||||
EXPECT_EQ(test_mem.new_count_, 0);
|
||||
}
|
||||
{
|
||||
char *ptr = reinterpret_cast<char *>(CheckAllocation(&mem, 1023, 1));
|
||||
EXPECT_EQ(&stack_data[1], ptr);
|
||||
EXPECT_EQ(test_mem.new_count_, 0);
|
||||
}
|
||||
CheckAllocation(&mem, 1);
|
||||
EXPECT_EQ(test_mem.new_count_, 1);
|
||||
mem.Release();
|
||||
// We will once more allocate from stack so reset it.
|
||||
memset(stack_data, 0x42, stack_data_size);
|
||||
EXPECT_EQ(test_mem.delete_count_, 1);
|
||||
{
|
||||
char *ptr = reinterpret_cast<char *>(CheckAllocation(&mem, 1024, 1));
|
||||
EXPECT_EQ(&stack_data[0], ptr);
|
||||
EXPECT_EQ(test_mem.new_count_, 1);
|
||||
}
|
||||
mem.Release();
|
||||
// Next allocation doesn't fit to stack so no need to reset it.
|
||||
EXPECT_EQ(test_mem.delete_count_, 1);
|
||||
{
|
||||
char *ptr = reinterpret_cast<char *>(CheckAllocation(&mem, 1025, 1));
|
||||
EXPECT_NE(&stack_data[0], ptr);
|
||||
EXPECT_EQ(test_mem.new_count_, 2);
|
||||
}
|
||||
}
|
||||
|
||||
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
||||
class ContainerWithAllocatorLast final {
|
||||
public:
|
||||
|
Loading…
Reference in New Issue
Block a user