Fix LOAD CSV large memory usage (#712)
This commit is contained in:
parent
3ee068bbf9
commit
e5e37bc14a
@ -1044,7 +1044,7 @@ std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::Pull(AnyStream *strea
|
||||
// Also, we want to throw only when the query engine requests more memory and not the storage
|
||||
// so we add the exception to the allocator.
|
||||
// TODO (mferencevic): Tune the parameters accordingly.
|
||||
utils::PoolResource pool_memory(128, 1024, &monotonic_memory);
|
||||
utils::PoolResource pool_memory(128, 1024, &monotonic_memory, utils::NewDeleteResource());
|
||||
std::optional<utils::LimitedMemoryResource> maybe_limited_resource;
|
||||
|
||||
if (memory_limit_) {
|
||||
|
@ -4528,24 +4528,24 @@ auto ToOptionalString(ExpressionEvaluator *evaluator, Expression *expression) ->
|
||||
return std::nullopt;
|
||||
};
|
||||
|
||||
TypedValue CsvRowToTypedList(csv::Reader::Row row) {
|
||||
TypedValue CsvRowToTypedList(csv::Reader::Row &row) {
|
||||
auto *mem = row.get_allocator().GetMemoryResource();
|
||||
auto typed_columns = utils::pmr::vector<TypedValue>(mem);
|
||||
typed_columns.reserve(row.size());
|
||||
for (auto &column : row) {
|
||||
typed_columns.emplace_back(std::move(column));
|
||||
}
|
||||
return TypedValue(typed_columns, mem);
|
||||
return {std::move(typed_columns), mem};
|
||||
}
|
||||
|
||||
TypedValue CsvRowToTypedMap(csv::Reader::Row row, csv::Reader::Header header) {
|
||||
TypedValue CsvRowToTypedMap(csv::Reader::Row &row, csv::Reader::Header header) {
|
||||
// a valid row has the same number of elements as the header
|
||||
auto *mem = row.get_allocator().GetMemoryResource();
|
||||
utils::pmr::map<utils::pmr::string, TypedValue> m(mem);
|
||||
for (auto i = 0; i < row.size(); ++i) {
|
||||
m.emplace(std::move(header[i]), std::move(row[i]));
|
||||
}
|
||||
return TypedValue(m, mem);
|
||||
return {std::move(m), mem};
|
||||
}
|
||||
|
||||
} // namespace
|
||||
@ -4584,18 +4584,17 @@ class LoadCsvCursor : public Cursor {
|
||||
// have to read at most cardinality(n) rows (but we can read less and stop
|
||||
// pulling MATCH).
|
||||
if (!input_is_once_ && !input_pulled) return false;
|
||||
|
||||
if (auto row = reader_->GetNextRow(context.evaluation_context.memory)) {
|
||||
if (!reader_->HasHeader()) {
|
||||
frame[self_->row_var_] = CsvRowToTypedList(std::move(*row));
|
||||
} else {
|
||||
frame[self_->row_var_] = CsvRowToTypedMap(
|
||||
std::move(*row), csv::Reader::Header(reader_->GetHeader(), context.evaluation_context.memory));
|
||||
}
|
||||
return true;
|
||||
auto row = reader_->GetNextRow(context.evaluation_context.memory);
|
||||
if (!row) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return false;
|
||||
if (!reader_->HasHeader()) {
|
||||
frame[self_->row_var_] = CsvRowToTypedList(*row);
|
||||
} else {
|
||||
frame[self_->row_var_] =
|
||||
CsvRowToTypedMap(*row, csv::Reader::Header(reader_->GetHeader(), context.evaluation_context.memory));
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void Reset() override { input_cursor_->Reset(); }
|
||||
|
@ -40,7 +40,7 @@ std::optional<utils::pmr::string> Reader::GetNextLine(utils::MemoryResource *mem
|
||||
return std::nullopt;
|
||||
}
|
||||
++line_count_;
|
||||
return line;
|
||||
return std::move(line);
|
||||
}
|
||||
|
||||
Reader::ParsingResult Reader::ParseHeader() {
|
||||
|
@ -251,9 +251,10 @@ void Pool::Release() {
|
||||
|
||||
} // namespace impl
|
||||
|
||||
PoolResource::PoolResource(size_t max_blocks_per_chunk, size_t max_block_size, MemoryResource *memory)
|
||||
: pools_(memory),
|
||||
unpooled_(memory),
|
||||
PoolResource::PoolResource(size_t max_blocks_per_chunk, size_t max_block_size, MemoryResource *memory_pools,
|
||||
MemoryResource *memory_unpooled)
|
||||
: pools_(memory_pools),
|
||||
unpooled_(memory_unpooled),
|
||||
max_blocks_per_chunk_(std::min(max_blocks_per_chunk, static_cast<size_t>(impl::Pool::MaxBlocksInChunk()))),
|
||||
max_block_size_(max_block_size) {
|
||||
MG_ASSERT(max_blocks_per_chunk_ > 0U, "Invalid number of blocks per chunk");
|
||||
@ -273,14 +274,14 @@ void *PoolResource::DoAllocate(size_t bytes, size_t alignment) {
|
||||
if (block_size % alignment != 0) throw BadAlloc("Requested bytes must be a multiple of alignment");
|
||||
if (block_size > max_block_size_) {
|
||||
// Allocate a big block.
|
||||
BigBlock big_block{bytes, alignment, GetUpstreamResource()->Allocate(bytes, alignment)};
|
||||
BigBlock big_block{bytes, alignment, GetUpstreamResourceBlocks()->Allocate(bytes, alignment)};
|
||||
// Insert the big block in the sorted position.
|
||||
auto it = std::lower_bound(unpooled_.begin(), unpooled_.end(), big_block,
|
||||
[](const auto &a, const auto &b) { return a.data < b.data; });
|
||||
try {
|
||||
unpooled_.insert(it, big_block);
|
||||
} catch (...) {
|
||||
GetUpstreamResource()->Deallocate(big_block.data, bytes, alignment);
|
||||
GetUpstreamResourceBlocks()->Deallocate(big_block.data, bytes, alignment);
|
||||
throw;
|
||||
}
|
||||
return big_block.data;
|
||||
@ -318,7 +319,7 @@ void PoolResource::DoDeallocate(void *p, size_t bytes, size_t alignment) {
|
||||
MG_ASSERT(it != unpooled_.end(), "Failed deallocation");
|
||||
MG_ASSERT(it->data == p && it->bytes == bytes && it->alignment == alignment, "Failed deallocation");
|
||||
unpooled_.erase(it);
|
||||
GetUpstreamResource()->Deallocate(p, bytes, alignment);
|
||||
GetUpstreamResourceBlocks()->Deallocate(p, bytes, alignment);
|
||||
return;
|
||||
}
|
||||
// Deallocate a regular block, first check if last_dealloc_pool_ is suitable.
|
||||
@ -339,7 +340,7 @@ void PoolResource::Release() {
|
||||
for (auto &pool : pools_) pool.Release();
|
||||
pools_.clear();
|
||||
for (auto &big_block : unpooled_)
|
||||
GetUpstreamResource()->Deallocate(big_block.data, big_block.bytes, big_block.alignment);
|
||||
GetUpstreamResourceBlocks()->Deallocate(big_block.data, big_block.bytes, big_block.alignment);
|
||||
unpooled_.clear();
|
||||
last_alloc_pool_ = nullptr;
|
||||
last_dealloc_pool_ = nullptr;
|
||||
|
@ -469,7 +469,8 @@ class PoolResource final : public MemoryResource {
|
||||
/// impl::Pool::MaxBlocksInChunk()) as the real maximum number of blocks per
|
||||
/// chunk. Allocation requests exceeding max_block_size are simply forwarded
|
||||
/// to upstream memory.
|
||||
PoolResource(size_t max_blocks_per_chunk, size_t max_block_size, MemoryResource *memory = NewDeleteResource());
|
||||
PoolResource(size_t max_blocks_per_chunk, size_t max_block_size, MemoryResource *memory_pools = NewDeleteResource(),
|
||||
MemoryResource *memory_unpooled = NewDeleteResource());
|
||||
|
||||
PoolResource(const PoolResource &) = delete;
|
||||
PoolResource &operator=(const PoolResource &) = delete;
|
||||
@ -480,6 +481,7 @@ class PoolResource final : public MemoryResource {
|
||||
~PoolResource() override { Release(); }
|
||||
|
||||
MemoryResource *GetUpstreamResource() const { return pools_.get_allocator().GetMemoryResource(); }
|
||||
MemoryResource *GetUpstreamResourceBlocks() const { return unpooled_.get_allocator().GetMemoryResource(); }
|
||||
|
||||
/// Release all allocated memory.
|
||||
void Release();
|
||||
|
@ -252,20 +252,21 @@ TEST(PoolResource, MultipleSmallBlockAllocations) {
|
||||
// NOLINTNEXTLINE(hicpp-special-member-functions)
|
||||
TEST(PoolResource, BigBlockAllocations) {
|
||||
TestMemory test_mem;
|
||||
TestMemory test_mem_unpooled;
|
||||
const size_t max_blocks_per_chunk = 3U;
|
||||
const size_t max_block_size = 64U;
|
||||
memgraph::utils::PoolResource mem(max_blocks_per_chunk, max_block_size, &test_mem);
|
||||
memgraph::utils::PoolResource mem(max_blocks_per_chunk, max_block_size, &test_mem, &test_mem_unpooled);
|
||||
CheckAllocation(&mem, max_block_size + 1, 1U);
|
||||
// May allocate more than once per block due to bookkeeping.
|
||||
EXPECT_GE(test_mem.new_count_, 1U);
|
||||
EXPECT_GE(test_mem_unpooled.new_count_, 1U);
|
||||
CheckAllocation(&mem, max_block_size + 1, 1U);
|
||||
EXPECT_GE(test_mem.new_count_, 2U);
|
||||
EXPECT_GE(test_mem_unpooled.new_count_, 2U);
|
||||
auto *ptr = CheckAllocation(&mem, max_block_size * 2, 1U);
|
||||
EXPECT_GE(test_mem.new_count_, 3U);
|
||||
EXPECT_GE(test_mem_unpooled.new_count_, 3U);
|
||||
mem.Deallocate(ptr, max_block_size * 2, 1U);
|
||||
EXPECT_GE(test_mem.delete_count_, 1U);
|
||||
EXPECT_GE(test_mem_unpooled.delete_count_, 1U);
|
||||
mem.Release();
|
||||
EXPECT_GE(test_mem.delete_count_, 3U);
|
||||
EXPECT_GE(test_mem_unpooled.delete_count_, 3U);
|
||||
CheckAllocation(&mem, max_block_size + 1, 1U);
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user