From e5e37bc14adee12ccad0701a50104570ab9ad9bf Mon Sep 17 00:00:00 2001 From: Antonio Filipovic <61245998+antoniofilipovic@users.noreply.github.com> Date: Thu, 22 Dec 2022 19:38:48 +0100 Subject: [PATCH] Fix LOAD CSV large memory usage (#712) --- src/query/interpreter.cpp | 2 +- src/query/plan/operator.cpp | 29 ++++++++++++++--------------- src/utils/csv_parsing.cpp | 2 +- src/utils/memory.cpp | 15 ++++++++------- src/utils/memory.hpp | 4 +++- tests/unit/utils_memory.cpp | 13 +++++++------ 6 files changed, 34 insertions(+), 31 deletions(-) diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index d5f156bc4..37672cca7 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -1044,7 +1044,7 @@ std::optional PullPlan::Pull(AnyStream *strea // Also, we want to throw only when the query engine requests more memory and not the storage // so we add the exception to the allocator. // TODO (mferencevic): Tune the parameters accordingly. - utils::PoolResource pool_memory(128, 1024, &monotonic_memory); + utils::PoolResource pool_memory(128, 1024, &monotonic_memory, utils::NewDeleteResource()); std::optional maybe_limited_resource; if (memory_limit_) { diff --git a/src/query/plan/operator.cpp b/src/query/plan/operator.cpp index 07ba7c877..debbc36bc 100644 --- a/src/query/plan/operator.cpp +++ b/src/query/plan/operator.cpp @@ -4528,24 +4528,24 @@ auto ToOptionalString(ExpressionEvaluator *evaluator, Expression *expression) -> return std::nullopt; }; -TypedValue CsvRowToTypedList(csv::Reader::Row row) { +TypedValue CsvRowToTypedList(csv::Reader::Row &row) { auto *mem = row.get_allocator().GetMemoryResource(); auto typed_columns = utils::pmr::vector(mem); typed_columns.reserve(row.size()); for (auto &column : row) { typed_columns.emplace_back(std::move(column)); } - return TypedValue(typed_columns, mem); + return {std::move(typed_columns), mem}; } -TypedValue CsvRowToTypedMap(csv::Reader::Row row, csv::Reader::Header header) { +TypedValue CsvRowToTypedMap(csv::Reader::Row &row, csv::Reader::Header header) { // a valid row has the same number of elements as the header auto *mem = row.get_allocator().GetMemoryResource(); utils::pmr::map m(mem); for (auto i = 0; i < row.size(); ++i) { m.emplace(std::move(header[i]), std::move(row[i])); } - return TypedValue(m, mem); + return {std::move(m), mem}; } } // namespace @@ -4584,18 +4584,17 @@ class LoadCsvCursor : public Cursor { // have to read at most cardinality(n) rows (but we can read less and stop // pulling MATCH). if (!input_is_once_ && !input_pulled) return false; - - if (auto row = reader_->GetNextRow(context.evaluation_context.memory)) { - if (!reader_->HasHeader()) { - frame[self_->row_var_] = CsvRowToTypedList(std::move(*row)); - } else { - frame[self_->row_var_] = CsvRowToTypedMap( - std::move(*row), csv::Reader::Header(reader_->GetHeader(), context.evaluation_context.memory)); - } - return true; + auto row = reader_->GetNextRow(context.evaluation_context.memory); + if (!row) { + return false; } - - return false; + if (!reader_->HasHeader()) { + frame[self_->row_var_] = CsvRowToTypedList(*row); + } else { + frame[self_->row_var_] = + CsvRowToTypedMap(*row, csv::Reader::Header(reader_->GetHeader(), context.evaluation_context.memory)); + } + return true; } void Reset() override { input_cursor_->Reset(); } diff --git a/src/utils/csv_parsing.cpp b/src/utils/csv_parsing.cpp index fc63bf9a4..49d8a0949 100644 --- a/src/utils/csv_parsing.cpp +++ b/src/utils/csv_parsing.cpp @@ -40,7 +40,7 @@ std::optional Reader::GetNextLine(utils::MemoryResource *mem return std::nullopt; } ++line_count_; - return line; + return std::move(line); } Reader::ParsingResult Reader::ParseHeader() { diff --git a/src/utils/memory.cpp b/src/utils/memory.cpp index 6f15183c0..f1cfca4e0 100644 --- a/src/utils/memory.cpp +++ b/src/utils/memory.cpp @@ -251,9 +251,10 @@ void Pool::Release() { } // namespace impl -PoolResource::PoolResource(size_t max_blocks_per_chunk, size_t max_block_size, MemoryResource *memory) - : pools_(memory), - unpooled_(memory), +PoolResource::PoolResource(size_t max_blocks_per_chunk, size_t max_block_size, MemoryResource *memory_pools, + MemoryResource *memory_unpooled) + : pools_(memory_pools), + unpooled_(memory_unpooled), max_blocks_per_chunk_(std::min(max_blocks_per_chunk, static_cast(impl::Pool::MaxBlocksInChunk()))), max_block_size_(max_block_size) { MG_ASSERT(max_blocks_per_chunk_ > 0U, "Invalid number of blocks per chunk"); @@ -273,14 +274,14 @@ void *PoolResource::DoAllocate(size_t bytes, size_t alignment) { if (block_size % alignment != 0) throw BadAlloc("Requested bytes must be a multiple of alignment"); if (block_size > max_block_size_) { // Allocate a big block. - BigBlock big_block{bytes, alignment, GetUpstreamResource()->Allocate(bytes, alignment)}; + BigBlock big_block{bytes, alignment, GetUpstreamResourceBlocks()->Allocate(bytes, alignment)}; // Insert the big block in the sorted position. auto it = std::lower_bound(unpooled_.begin(), unpooled_.end(), big_block, [](const auto &a, const auto &b) { return a.data < b.data; }); try { unpooled_.insert(it, big_block); } catch (...) { - GetUpstreamResource()->Deallocate(big_block.data, bytes, alignment); + GetUpstreamResourceBlocks()->Deallocate(big_block.data, bytes, alignment); throw; } return big_block.data; @@ -318,7 +319,7 @@ void PoolResource::DoDeallocate(void *p, size_t bytes, size_t alignment) { MG_ASSERT(it != unpooled_.end(), "Failed deallocation"); MG_ASSERT(it->data == p && it->bytes == bytes && it->alignment == alignment, "Failed deallocation"); unpooled_.erase(it); - GetUpstreamResource()->Deallocate(p, bytes, alignment); + GetUpstreamResourceBlocks()->Deallocate(p, bytes, alignment); return; } // Deallocate a regular block, first check if last_dealloc_pool_ is suitable. @@ -339,7 +340,7 @@ void PoolResource::Release() { for (auto &pool : pools_) pool.Release(); pools_.clear(); for (auto &big_block : unpooled_) - GetUpstreamResource()->Deallocate(big_block.data, big_block.bytes, big_block.alignment); + GetUpstreamResourceBlocks()->Deallocate(big_block.data, big_block.bytes, big_block.alignment); unpooled_.clear(); last_alloc_pool_ = nullptr; last_dealloc_pool_ = nullptr; diff --git a/src/utils/memory.hpp b/src/utils/memory.hpp index 92f766ac1..62b1f1d17 100644 --- a/src/utils/memory.hpp +++ b/src/utils/memory.hpp @@ -469,7 +469,8 @@ class PoolResource final : public MemoryResource { /// impl::Pool::MaxBlocksInChunk()) as the real maximum number of blocks per /// chunk. Allocation requests exceeding max_block_size are simply forwarded /// to upstream memory. - PoolResource(size_t max_blocks_per_chunk, size_t max_block_size, MemoryResource *memory = NewDeleteResource()); + PoolResource(size_t max_blocks_per_chunk, size_t max_block_size, MemoryResource *memory_pools = NewDeleteResource(), + MemoryResource *memory_unpooled = NewDeleteResource()); PoolResource(const PoolResource &) = delete; PoolResource &operator=(const PoolResource &) = delete; @@ -480,6 +481,7 @@ class PoolResource final : public MemoryResource { ~PoolResource() override { Release(); } MemoryResource *GetUpstreamResource() const { return pools_.get_allocator().GetMemoryResource(); } + MemoryResource *GetUpstreamResourceBlocks() const { return unpooled_.get_allocator().GetMemoryResource(); } /// Release all allocated memory. void Release(); diff --git a/tests/unit/utils_memory.cpp b/tests/unit/utils_memory.cpp index 70bf85653..983d09acb 100644 --- a/tests/unit/utils_memory.cpp +++ b/tests/unit/utils_memory.cpp @@ -252,20 +252,21 @@ TEST(PoolResource, MultipleSmallBlockAllocations) { // NOLINTNEXTLINE(hicpp-special-member-functions) TEST(PoolResource, BigBlockAllocations) { TestMemory test_mem; + TestMemory test_mem_unpooled; const size_t max_blocks_per_chunk = 3U; const size_t max_block_size = 64U; - memgraph::utils::PoolResource mem(max_blocks_per_chunk, max_block_size, &test_mem); + memgraph::utils::PoolResource mem(max_blocks_per_chunk, max_block_size, &test_mem, &test_mem_unpooled); CheckAllocation(&mem, max_block_size + 1, 1U); // May allocate more than once per block due to bookkeeping. - EXPECT_GE(test_mem.new_count_, 1U); + EXPECT_GE(test_mem_unpooled.new_count_, 1U); CheckAllocation(&mem, max_block_size + 1, 1U); - EXPECT_GE(test_mem.new_count_, 2U); + EXPECT_GE(test_mem_unpooled.new_count_, 2U); auto *ptr = CheckAllocation(&mem, max_block_size * 2, 1U); - EXPECT_GE(test_mem.new_count_, 3U); + EXPECT_GE(test_mem_unpooled.new_count_, 3U); mem.Deallocate(ptr, max_block_size * 2, 1U); - EXPECT_GE(test_mem.delete_count_, 1U); + EXPECT_GE(test_mem_unpooled.delete_count_, 1U); mem.Release(); - EXPECT_GE(test_mem.delete_count_, 3U); + EXPECT_GE(test_mem_unpooled.delete_count_, 3U); CheckAllocation(&mem, max_block_size + 1, 1U); }