From e5e37bc14adee12ccad0701a50104570ab9ad9bf Mon Sep 17 00:00:00 2001 From: Antonio Filipovic <61245998+antoniofilipovic@users.noreply.github.com> Date: Thu, 22 Dec 2022 19:38:48 +0100 Subject: [PATCH 1/5] Fix LOAD CSV large memory usage (#712) --- src/query/interpreter.cpp | 2 +- src/query/plan/operator.cpp | 29 ++++++++++++++--------------- src/utils/csv_parsing.cpp | 2 +- src/utils/memory.cpp | 15 ++++++++------- src/utils/memory.hpp | 4 +++- tests/unit/utils_memory.cpp | 13 +++++++------ 6 files changed, 34 insertions(+), 31 deletions(-) diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index d5f156bc4..37672cca7 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -1044,7 +1044,7 @@ std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::Pull(AnyStream *strea // Also, we want to throw only when the query engine requests more memory and not the storage // so we add the exception to the allocator. // TODO (mferencevic): Tune the parameters accordingly. - utils::PoolResource pool_memory(128, 1024, &monotonic_memory); + utils::PoolResource pool_memory(128, 1024, &monotonic_memory, utils::NewDeleteResource()); std::optional<utils::LimitedMemoryResource> maybe_limited_resource; if (memory_limit_) { diff --git a/src/query/plan/operator.cpp b/src/query/plan/operator.cpp index 07ba7c877..debbc36bc 100644 --- a/src/query/plan/operator.cpp +++ b/src/query/plan/operator.cpp @@ -4528,24 +4528,24 @@ auto ToOptionalString(ExpressionEvaluator *evaluator, Expression *expression) -> return std::nullopt; }; -TypedValue CsvRowToTypedList(csv::Reader::Row row) { +TypedValue CsvRowToTypedList(csv::Reader::Row &row) { auto *mem = row.get_allocator().GetMemoryResource(); auto typed_columns = utils::pmr::vector<TypedValue>(mem); typed_columns.reserve(row.size()); for (auto &column : row) { typed_columns.emplace_back(std::move(column)); } - return TypedValue(typed_columns, mem); + return {std::move(typed_columns), mem}; } -TypedValue CsvRowToTypedMap(csv::Reader::Row row, csv::Reader::Header header) { +TypedValue CsvRowToTypedMap(csv::Reader::Row &row, csv::Reader::Header header) { // a valid row has the same number of elements as the header auto *mem = row.get_allocator().GetMemoryResource(); utils::pmr::map<utils::pmr::string, TypedValue> m(mem); for (auto i = 0; i < row.size(); ++i) { m.emplace(std::move(header[i]), std::move(row[i])); } - return TypedValue(m, mem); + return {std::move(m), mem}; } } // namespace @@ -4584,18 +4584,17 @@ class LoadCsvCursor : public Cursor { // have to read at most cardinality(n) rows (but we can read less and stop // pulling MATCH). if (!input_is_once_ && !input_pulled) return false; - - if (auto row = reader_->GetNextRow(context.evaluation_context.memory)) { - if (!reader_->HasHeader()) { - frame[self_->row_var_] = CsvRowToTypedList(std::move(*row)); - } else { - frame[self_->row_var_] = CsvRowToTypedMap( - std::move(*row), csv::Reader::Header(reader_->GetHeader(), context.evaluation_context.memory)); - } - return true; + auto row = reader_->GetNextRow(context.evaluation_context.memory); + if (!row) { + return false; } - - return false; + if (!reader_->HasHeader()) { + frame[self_->row_var_] = CsvRowToTypedList(*row); + } else { + frame[self_->row_var_] = + CsvRowToTypedMap(*row, csv::Reader::Header(reader_->GetHeader(), context.evaluation_context.memory)); + } + return true; } void Reset() override { input_cursor_->Reset(); } diff --git a/src/utils/csv_parsing.cpp b/src/utils/csv_parsing.cpp index fc63bf9a4..49d8a0949 100644 --- a/src/utils/csv_parsing.cpp +++ b/src/utils/csv_parsing.cpp @@ -40,7 +40,7 @@ std::optional<utils::pmr::string> Reader::GetNextLine(utils::MemoryResource *mem return std::nullopt; } ++line_count_; - return line; + return std::move(line); } Reader::ParsingResult Reader::ParseHeader() { diff --git a/src/utils/memory.cpp b/src/utils/memory.cpp index 6f15183c0..f1cfca4e0 100644 --- a/src/utils/memory.cpp +++ b/src/utils/memory.cpp @@ -251,9 +251,10 @@ void Pool::Release() { } // namespace impl -PoolResource::PoolResource(size_t max_blocks_per_chunk, size_t max_block_size, MemoryResource *memory) - : pools_(memory), - unpooled_(memory), +PoolResource::PoolResource(size_t max_blocks_per_chunk, size_t max_block_size, MemoryResource *memory_pools, + MemoryResource *memory_unpooled) + : pools_(memory_pools), + unpooled_(memory_unpooled), max_blocks_per_chunk_(std::min(max_blocks_per_chunk, static_cast<size_t>(impl::Pool::MaxBlocksInChunk()))), max_block_size_(max_block_size) { MG_ASSERT(max_blocks_per_chunk_ > 0U, "Invalid number of blocks per chunk"); @@ -273,14 +274,14 @@ void *PoolResource::DoAllocate(size_t bytes, size_t alignment) { if (block_size % alignment != 0) throw BadAlloc("Requested bytes must be a multiple of alignment"); if (block_size > max_block_size_) { // Allocate a big block. - BigBlock big_block{bytes, alignment, GetUpstreamResource()->Allocate(bytes, alignment)}; + BigBlock big_block{bytes, alignment, GetUpstreamResourceBlocks()->Allocate(bytes, alignment)}; // Insert the big block in the sorted position. auto it = std::lower_bound(unpooled_.begin(), unpooled_.end(), big_block, [](const auto &a, const auto &b) { return a.data < b.data; }); try { unpooled_.insert(it, big_block); } catch (...) { - GetUpstreamResource()->Deallocate(big_block.data, bytes, alignment); + GetUpstreamResourceBlocks()->Deallocate(big_block.data, bytes, alignment); throw; } return big_block.data; @@ -318,7 +319,7 @@ void PoolResource::DoDeallocate(void *p, size_t bytes, size_t alignment) { MG_ASSERT(it != unpooled_.end(), "Failed deallocation"); MG_ASSERT(it->data == p && it->bytes == bytes && it->alignment == alignment, "Failed deallocation"); unpooled_.erase(it); - GetUpstreamResource()->Deallocate(p, bytes, alignment); + GetUpstreamResourceBlocks()->Deallocate(p, bytes, alignment); return; } // Deallocate a regular block, first check if last_dealloc_pool_ is suitable. @@ -339,7 +340,7 @@ void PoolResource::Release() { for (auto &pool : pools_) pool.Release(); pools_.clear(); for (auto &big_block : unpooled_) - GetUpstreamResource()->Deallocate(big_block.data, big_block.bytes, big_block.alignment); + GetUpstreamResourceBlocks()->Deallocate(big_block.data, big_block.bytes, big_block.alignment); unpooled_.clear(); last_alloc_pool_ = nullptr; last_dealloc_pool_ = nullptr; diff --git a/src/utils/memory.hpp b/src/utils/memory.hpp index 92f766ac1..62b1f1d17 100644 --- a/src/utils/memory.hpp +++ b/src/utils/memory.hpp @@ -469,7 +469,8 @@ class PoolResource final : public MemoryResource { /// impl::Pool::MaxBlocksInChunk()) as the real maximum number of blocks per /// chunk. Allocation requests exceeding max_block_size are simply forwarded /// to upstream memory. - PoolResource(size_t max_blocks_per_chunk, size_t max_block_size, MemoryResource *memory = NewDeleteResource()); + PoolResource(size_t max_blocks_per_chunk, size_t max_block_size, MemoryResource *memory_pools = NewDeleteResource(), + MemoryResource *memory_unpooled = NewDeleteResource()); PoolResource(const PoolResource &) = delete; PoolResource &operator=(const PoolResource &) = delete; @@ -480,6 +481,7 @@ class PoolResource final : public MemoryResource { ~PoolResource() override { Release(); } MemoryResource *GetUpstreamResource() const { return pools_.get_allocator().GetMemoryResource(); } + MemoryResource *GetUpstreamResourceBlocks() const { return unpooled_.get_allocator().GetMemoryResource(); } /// Release all allocated memory. void Release(); diff --git a/tests/unit/utils_memory.cpp b/tests/unit/utils_memory.cpp index 70bf85653..983d09acb 100644 --- a/tests/unit/utils_memory.cpp +++ b/tests/unit/utils_memory.cpp @@ -252,20 +252,21 @@ TEST(PoolResource, MultipleSmallBlockAllocations) { // NOLINTNEXTLINE(hicpp-special-member-functions) TEST(PoolResource, BigBlockAllocations) { TestMemory test_mem; + TestMemory test_mem_unpooled; const size_t max_blocks_per_chunk = 3U; const size_t max_block_size = 64U; - memgraph::utils::PoolResource mem(max_blocks_per_chunk, max_block_size, &test_mem); + memgraph::utils::PoolResource mem(max_blocks_per_chunk, max_block_size, &test_mem, &test_mem_unpooled); CheckAllocation(&mem, max_block_size + 1, 1U); // May allocate more than once per block due to bookkeeping. - EXPECT_GE(test_mem.new_count_, 1U); + EXPECT_GE(test_mem_unpooled.new_count_, 1U); CheckAllocation(&mem, max_block_size + 1, 1U); - EXPECT_GE(test_mem.new_count_, 2U); + EXPECT_GE(test_mem_unpooled.new_count_, 2U); auto *ptr = CheckAllocation(&mem, max_block_size * 2, 1U); - EXPECT_GE(test_mem.new_count_, 3U); + EXPECT_GE(test_mem_unpooled.new_count_, 3U); mem.Deallocate(ptr, max_block_size * 2, 1U); - EXPECT_GE(test_mem.delete_count_, 1U); + EXPECT_GE(test_mem_unpooled.delete_count_, 1U); mem.Release(); - EXPECT_GE(test_mem.delete_count_, 3U); + EXPECT_GE(test_mem_unpooled.delete_count_, 3U); CheckAllocation(&mem, max_block_size + 1, 1U); } From d72e7fa38d3a533afc630f5a6646fea474a4811b Mon Sep 17 00:00:00 2001 From: Ante Javor <javor.ante@gmail.com> Date: Fri, 23 Dec 2022 10:08:52 +0100 Subject: [PATCH 2/5] Fix mgp.py create edge type hint and comment (#724) --- include/mgp.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/include/mgp.py b/include/mgp.py index 6cef5a451..52df6c332 100644 --- a/include/mgp.py +++ b/include/mgp.py @@ -1283,7 +1283,7 @@ class Graph: raise InvalidContextError() self._graph.detach_delete_vertex(vertex._vertex) - def create_edge(self, from_vertex: Vertex, to_vertex: Vertex, edge_type: EdgeType) -> None: + def create_edge(self, from_vertex: Vertex, to_vertex: Vertex, edge_type: EdgeType) -> Edge: """ Create an edge. @@ -1292,13 +1292,16 @@ class Graph: to_vertex: `Vertex' to where edge is directed. edge_type: `EdgeType` defines the type of edge. + Returns: + Created `Edge`. + Raises: ImmutableObjectError: If `graph` is immutable. UnableToAllocateError: If unable to allocate an edge. DeletedObjectError: If `from_vertex` or `to_vertex` has been deleted. SerializationError: If `from_vertex` or `to_vertex` has been modified by another transaction. Examples: - ```graph.create_edge(from_vertex, vertex, edge_type)``` + ```edge = graph.create_edge(from_vertex, vertex, edge_type)``` """ if not self.is_valid(): raise InvalidContextError() From 1f2a15e7c89be8b70c235c64c2bb4a595abb02b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bruno=20Sa=C4=8Dari=C4=87?= <bruno.sacaric@gmail.com> Date: Fri, 23 Dec 2022 14:47:12 +0100 Subject: [PATCH 3/5] Fix MATCH not allowed on replica (#709) --- src/query/plan/read_write_type_checker.cpp | 19 +++++++++---- src/query/plan/read_write_type_checker.hpp | 3 +- .../unit/query_plan_read_write_typecheck.cpp | 28 +++++++++++++++++++ 3 files changed, 42 insertions(+), 8 deletions(-) diff --git a/src/query/plan/read_write_type_checker.cpp b/src/query/plan/read_write_type_checker.cpp index 94252eb3f..851465c56 100644 --- a/src/query/plan/read_write_type_checker.cpp +++ b/src/query/plan/read_write_type_checker.cpp @@ -91,18 +91,25 @@ bool ReadWriteTypeChecker::PreVisit([[maybe_unused]] Foreach &op) { bool ReadWriteTypeChecker::Visit(Once &) { return false; } // NOLINT(hicpp-named-parameter) void ReadWriteTypeChecker::UpdateType(RWType op_type) { - // Update type only if it's not the NONE type and the current operator's type - // is different than the one that's currently inferred. - if (type != RWType::NONE && type != op_type) { - type = RWType::RW; - } // Stop inference because RW is the most "dominant" type, i.e. it isn't // affected by the type of nodes in the plan appearing after the node for // which the type is set to RW. if (type == RWType::RW) { return; } - if (type == RWType::NONE && op_type != RWType::NONE) { + + // if op_type is NONE, type doesn't change. + if (op_type == RWType::NONE) { + return; + } + + // Update type only if it's not the NONE type and the current operator's type + // is different than the one that's currently inferred. + if (type != RWType::NONE && type != op_type) { + type = RWType::RW; + } + + if (type == RWType::NONE) { type = op_type; } } diff --git a/src/query/plan/read_write_type_checker.hpp b/src/query/plan/read_write_type_checker.hpp index 42d9569b0..34222c52c 100644 --- a/src/query/plan/read_write_type_checker.hpp +++ b/src/query/plan/read_write_type_checker.hpp @@ -15,7 +15,7 @@ namespace memgraph::query::plan { -class ReadWriteTypeChecker : public virtual HierarchicalLogicalOperatorVisitor { +struct ReadWriteTypeChecker : public virtual HierarchicalLogicalOperatorVisitor { public: ReadWriteTypeChecker() = default; @@ -89,7 +89,6 @@ class ReadWriteTypeChecker : public virtual HierarchicalLogicalOperatorVisitor { bool Visit(Once &) override; - private: void UpdateType(RWType op_type); }; diff --git a/tests/unit/query_plan_read_write_typecheck.cpp b/tests/unit/query_plan_read_write_typecheck.cpp index 673620ff9..7f84e9b16 100644 --- a/tests/unit/query_plan_read_write_typecheck.cpp +++ b/tests/unit/query_plan_read_write_typecheck.cpp @@ -253,3 +253,31 @@ TEST_F(ReadWriteTypeCheckTest, Foreach) { std::shared_ptr<LogicalOperator> foreach = std::make_shared<plan::Foreach>(nullptr, nullptr, nullptr, x); CheckPlanType(foreach.get(), RWType::RW); } + +TEST_F(ReadWriteTypeCheckTest, CheckUpdateType) { + std::array<std::array<RWType, 3>, 16> scenarios = {{ + {RWType::NONE, RWType::NONE, RWType::NONE}, + {RWType::NONE, RWType::R, RWType::R}, + {RWType::NONE, RWType::W, RWType::W}, + {RWType::NONE, RWType::RW, RWType::RW}, + {RWType::R, RWType::NONE, RWType::R}, + {RWType::R, RWType::R, RWType::R}, + {RWType::R, RWType::W, RWType::RW}, + {RWType::R, RWType::RW, RWType::RW}, + {RWType::W, RWType::NONE, RWType::W}, + {RWType::W, RWType::R, RWType::RW}, + {RWType::W, RWType::W, RWType::W}, + {RWType::W, RWType::RW, RWType::RW}, + {RWType::RW, RWType::NONE, RWType::RW}, + {RWType::RW, RWType::R, RWType::RW}, + {RWType::RW, RWType::W, RWType::RW}, + {RWType::RW, RWType::RW, RWType::RW}, + }}; + + auto rw_type_checker = ReadWriteTypeChecker(); + for (auto scenario : scenarios) { + rw_type_checker.type = scenario[0]; + rw_type_checker.UpdateType(scenario[1]); + EXPECT_EQ(scenario[2], rw_type_checker.type); + } +} From eda5213d950e4cf4f8c81d6eee8963fa0596ffa6 Mon Sep 17 00:00:00 2001 From: Katarina Supe <61758502+katarinasupe@users.noreply.github.com> Date: Sat, 24 Dec 2022 09:33:53 +0100 Subject: [PATCH 4/5] Release pypi mgp 1.1.1 (#727) --- release/mgp/.gitignore | 1 + release/mgp/pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/release/mgp/.gitignore b/release/mgp/.gitignore index 670cd832d..43491458b 100644 --- a/release/mgp/.gitignore +++ b/release/mgp/.gitignore @@ -1,3 +1,4 @@ .venv dist mgp.py +poetry.lock diff --git a/release/mgp/pyproject.toml b/release/mgp/pyproject.toml index 137eca123..b882f023c 100644 --- a/release/mgp/pyproject.toml +++ b/release/mgp/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "mgp" -version = "1.1.0" +version = "1.1.1" description = "Memgraph's module for developing MAGE modules. Used only for type hinting!" authors = [ "katarinasupe <katarina.supe@memgraph.io>", From 8b834c702cee2c9a20c8366c436aa0953dbfd5e1 Mon Sep 17 00:00:00 2001 From: Ante Javor <javor.ante@gmail.com> Date: Sat, 14 Jan 2023 16:11:49 +0100 Subject: [PATCH 5/5] Update mgbench to run Diff workflow under 30mins (#730) --- tests/mgbench/benchmark.py | 65 +++++++++++++++++++++--------------- tests/mgbench/graph_bench.py | 2 ++ 2 files changed, 41 insertions(+), 26 deletions(-) diff --git a/tests/mgbench/benchmark.py b/tests/mgbench/benchmark.py index 743f4e92c..f08d6e3fa 100755 --- a/tests/mgbench/benchmark.py +++ b/tests/mgbench/benchmark.py @@ -142,7 +142,7 @@ parser.add_argument( with the presence of 300 write queries from write type or 30%""", ) -parser.add_argument("--tail-latency", type=int, default=100, help="Number of queries for the tail latency statistics") +parser.add_argument("--tail-latency", type=int, default=0, help="Number of queries for the tail latency statistics") parser.add_argument( "--performance-tracking", @@ -223,8 +223,17 @@ def filter_benchmarks(generators, patterns): patterns, ): current[group].append((query_name, query_func)) - if len(current) > 0: - filtered.append((generator(variant, args.vendor_name), dict(current))) + if len(current) == 0: + continue + + # Ignore benchgraph "basic" queries in standard CI/CD run + for pattern in patterns: + res = pattern.count("*") + key = "basic" + if res >= 2 and key in current.keys(): + current.pop(key) + + filtered.append((generator(variant, args.vendor_name), dict(current))) return filtered @@ -241,30 +250,34 @@ def warmup(client): def tail_latency(vendor, client, func): - vendor.start_benchmark("tail_latency") - if args.warmup_run: - warmup(client) - latency = [] iteration = args.tail_latency - query_list = get_queries(func, iteration) - for i in range(0, iteration): - ret = client.execute(queries=[query_list[i]], num_workers=1) - latency.append(ret[0]["duration"]) - latency.sort() - query_stats = { - "iterations": iteration, - "min": latency[0], - "max": latency[iteration - 1], - "mean": statistics.mean(latency), - "p99": latency[math.floor(iteration * 0.99) - 1], - "p95": latency[math.floor(iteration * 0.95) - 1], - "p90": latency[math.floor(iteration * 0.90) - 1], - "p75": latency[math.floor(iteration * 0.75) - 1], - "p50": latency[math.floor(iteration * 0.50) - 1], - } - print("Query statistics for tail latency: ") - print(query_stats) - vendor.stop("tail_latency") + if iteration >= 10: + vendor.start_benchmark("tail_latency") + if args.warmup_run: + warmup(client) + latency = [] + + query_list = get_queries(func, iteration) + for i in range(0, iteration): + ret = client.execute(queries=[query_list[i]], num_workers=1) + latency.append(ret[0]["duration"]) + latency.sort() + query_stats = { + "iterations": iteration, + "min": latency[0], + "max": latency[iteration - 1], + "mean": statistics.mean(latency), + "p99": latency[math.floor(iteration * 0.99) - 1], + "p95": latency[math.floor(iteration * 0.95) - 1], + "p90": latency[math.floor(iteration * 0.90) - 1], + "p75": latency[math.floor(iteration * 0.75) - 1], + "p50": latency[math.floor(iteration * 0.50) - 1], + } + print("Query statistics for tail latency: ") + print(query_stats) + vendor.stop("tail_latency") + else: + query_stats = {} return query_stats diff --git a/tests/mgbench/graph_bench.py b/tests/mgbench/graph_bench.py index 8173a4a54..d1a633081 100644 --- a/tests/mgbench/graph_bench.py +++ b/tests/mgbench/graph_bench.py @@ -147,6 +147,8 @@ def run_full_benchmarks(vendor, binary, dataset_size, dataset_group, realistic, "12", "--no-authorization", "pokec/" + dataset_size + "/" + dataset_group + "/*", + "--tail-latency", + "100", ] for config in configurations: