From 41bb988fe9e70a70dac418b16303866f8c4b6b81 Mon Sep 17 00:00:00 2001 From: gvolfing Date: Thu, 12 Jan 2023 14:14:59 +0100 Subject: [PATCH] Fix failing benchmark tests and implement cursor The benchmarking tests were failing because of the incorrect implementation of the ScanAllByPrimaryKeyCursor. The previous implementation caused the currently allocateable 1m edgeids to run out very quickly, causing the the tests to freeze. --- src/query/v2/context.hpp | 2 +- src/query/v2/interpreter.cpp | 2 +- src/query/v2/plan/operator.cpp | 40 ++++++++++++++++--- src/query/v2/request_router.hpp | 37 +++++++++++++++-- tests/mgbench/client.cpp | 2 +- tests/simulation/request_router.cpp | 4 +- tests/simulation/test_cluster.hpp | 4 +- tests/unit/high_density_shard_create_scan.cpp | 4 +- tests/unit/machine_manager.cpp | 4 +- tests/unit/query_v2_expression_evaluator.cpp | 7 +++- 10 files changed, 84 insertions(+), 22 deletions(-) diff --git a/src/query/v2/context.hpp b/src/query/v2/context.hpp index cb30a9ced..58f5ada97 100644 --- a/src/query/v2/context.hpp +++ b/src/query/v2/context.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source diff --git a/src/query/v2/interpreter.cpp b/src/query/v2/interpreter.cpp index fde11ac00..9a4bfde90 100644 --- a/src/query/v2/interpreter.cpp +++ b/src/query/v2/interpreter.cpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp index 02abdc3c4..b7841bf16 100644 --- a/src/query/v2/plan/operator.cpp +++ b/src/query/v2/plan/operator.cpp @@ -34,6 +34,7 @@ #include "query/v2/bindings/eval.hpp" #include "query/v2/bindings/symbol_table.hpp" #include "query/v2/context.hpp" +#include "query/v2/conversions.hpp" #include "query/v2/db_accessor.hpp" #include "query/v2/exceptions.hpp" #include "query/v2/frontend/ast/ast.hpp" @@ -403,7 +404,7 @@ class DistributedScanAllAndFilterCursor : public Cursor { if (label_.has_value()) { request_label = request_router.LabelToName(*label_); } - current_batch = request_router.ScanVertices(request_label); + current_batch = request_router.ScanVertices(request_label, std::nullopt); } current_vertex_it = current_batch.begin(); request_state_ = State::COMPLETED; @@ -468,13 +469,14 @@ class DistributedScanAllByPrimaryKeyCursor : public Cursor { Symbol output_symbol, UniqueCursorPtr input_cursor, const char *op_name, std::optional label, std::optional> property_expression_pair, - std::optional> filter_expressions) + std::optional> filter_expressions, std::optional> primary_key) : output_symbol_(output_symbol), input_cursor_(std::move(input_cursor)), op_name_(op_name), label_(label), property_expression_pair_(property_expression_pair), - filter_expressions_(filter_expressions) { + filter_expressions_(filter_expressions), + primary_key_(primary_key) { ResetExecutionState(); } @@ -489,7 +491,32 @@ class DistributedScanAllByPrimaryKeyCursor : public Cursor { if (label_.has_value()) { request_label = request_router.LabelToName(*label_); } - current_batch_ = request_router.ScanVertices(request_label); + current_batch_ = request_router.ScanVertices(request_label, std::nullopt); + } + current_vertex_it_ = current_batch_.begin(); + request_state_ = State::COMPLETED; + return !current_batch_.empty(); + } + + bool MakeRequestSingleFrame(Frame &frame, RequestRouterInterface &request_router, ExecutionContext &context) { + { + SCOPED_REQUEST_WAIT_PROFILE; + std::optional request_label = std::nullopt; + if (label_.has_value()) { + request_label = request_router.LabelToName(*label_); + } + + // Evaluate the expressions that hold the PrimaryKey. + ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router, + storage::v3::View::OLD); + + std::vector pk; + MG_ASSERT(primary_key_); + for (auto primary_key : *primary_key_) { + pk.push_back(TypedValueToValue(primary_key->Accept(evaluator))); + } + + current_batch_ = request_router.ScanVertices(request_label, pk); } current_vertex_it_ = current_batch_.begin(); request_state_ = State::COMPLETED; @@ -612,6 +639,7 @@ class DistributedScanAllByPrimaryKeyCursor : public Cursor { std::optional label_; std::optional> property_expression_pair_; std::optional> filter_expressions_; + std::optional> primary_key_; std::optional own_multi_frames_; std::optional valid_frames_consumer_; ValidFramesConsumer::Iterator valid_frames_it_; @@ -727,8 +755,8 @@ UniqueCursorPtr ScanAllByPrimaryKey::MakeCursor(utils::MemoryResource *mem) cons EventCounter::IncrementCounter(EventCounter::ScanAllByPrimaryKeyOperator); return MakeUniqueCursorPtr( - mem, output_symbol_, input_->MakeCursor(mem), "ScanAll", std::nullopt /*label*/, - std::nullopt /*property_expression_pair*/, std::nullopt /*filter_expressions*/); + mem, output_symbol_, input_->MakeCursor(mem), "ScanAll", label_, std::nullopt /*property_expression_pair*/, + std::nullopt /*filter_expressions*/, primary_key_); throw QueryRuntimeException("ScanAllByPrimaryKey cursur is yet to be implemented."); } diff --git a/src/query/v2/request_router.hpp b/src/query/v2/request_router.hpp index ca8d759f1..934190cb4 100644 --- a/src/query/v2/request_router.hpp +++ b/src/query/v2/request_router.hpp @@ -97,7 +97,8 @@ class RequestRouterInterface { virtual void StartTransaction() = 0; virtual void Commit() = 0; - virtual std::vector ScanVertices(std::optional label) = 0; + virtual std::vector ScanVertices(std::optional label, + std::optional> primary_key) = 0; virtual std::vector CreateVertices(std::vector new_vertices) = 0; virtual std::vector ExpandOne(msgs::ExpandOneRequest request) = 0; virtual std::vector CreateExpand(std::vector new_edges) = 0; @@ -243,9 +244,16 @@ class RequestRouter : public RequestRouterInterface { bool IsPrimaryLabel(storage::v3::LabelId label) const override { return shards_map_.label_spaces.contains(label); } // TODO(kostasrim) Simplify return result - std::vector ScanVertices(std::optional label) override { + std::vector ScanVertices(std::optional label, + std::optional> primary_key) override { // create requests - std::vector> requests_to_be_sent = RequestsForScanVertices(label); + std::vector> requests_to_be_sent; + if (primary_key) { + requests_to_be_sent = RequestsForScanVertexByPrimaryKey(label, *primary_key); + } else { + requests_to_be_sent = RequestsForScanVertices(label); + } + spdlog::trace("created {} ScanVertices requests", requests_to_be_sent.size()); // begin all requests in parallel @@ -510,6 +518,29 @@ class RequestRouter : public RequestRouterInterface { return requests; } + std::vector> RequestsForScanVertexByPrimaryKey( + const std::optional &label, const std::vector &primary_key) { + const auto label_id = shards_map_.GetLabelId(*label); + MG_ASSERT(label_id); + MG_ASSERT(IsPrimaryLabel(*label_id)); + std::vector> requests = {}; + + auto pk_containing_shard = + shards_map_.GetShardForKey(*label, storage::conversions::ConvertPropertyVector(primary_key)); + + msgs::ScanVerticesRequest request; + request.transaction_id = transaction_id_; + request.batch_limit = 1; + request.start_id.second = primary_key; + + ShardRequestState shard_request_state{ + .shard = pk_containing_shard, + .request = std::move(request), + }; + requests.emplace_back(std::move(shard_request_state)); + return requests; + } + std::vector> RequestsForExpandOne(const msgs::ExpandOneRequest &request) { std::map per_shard_request_table; msgs::ExpandOneRequest top_level_rqst_template = request; diff --git a/tests/mgbench/client.cpp b/tests/mgbench/client.cpp index e4b63d477..000c199fa 100644 --- a/tests/mgbench/client.cpp +++ b/tests/mgbench/client.cpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source diff --git a/tests/simulation/request_router.cpp b/tests/simulation/request_router.cpp index 4248e7876..10976baa9 100644 --- a/tests/simulation/request_router.cpp +++ b/tests/simulation/request_router.cpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -154,7 +154,7 @@ void RunStorageRaft(Raft &request_router, std void ExecuteOp(query::v2::RequestRouter &request_router, std::set &correctness_model, ScanAll scan_all) { - auto results = request_router.ScanVertices("test_label"); + auto results = request_router.ScanVertices("test_label", std::nullopt); RC_ASSERT(results.size() == correctness_model.size()); diff --git a/tests/unit/high_density_shard_create_scan.cpp b/tests/unit/high_density_shard_create_scan.cpp index 9fabf6ccc..a9647c795 100644 --- a/tests/unit/high_density_shard_create_scan.cpp +++ b/tests/unit/high_density_shard_create_scan.cpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -192,7 +192,7 @@ void ExecuteOp(query::v2::RequestRouter &request_router, std::se void ExecuteOp(query::v2::RequestRouter &request_router, std::set &correctness_model, ScanAll scan_all) { - auto results = request_router.ScanVertices("test_label"); + auto results = request_router.ScanVertices("test_label", std::nullopt); spdlog::error("got {} results, model size is {}", results.size(), correctness_model.size()); EXPECT_EQ(results.size(), correctness_model.size()); diff --git a/tests/unit/machine_manager.cpp b/tests/unit/machine_manager.cpp index 74b7d3863..8d0670e81 100644 --- a/tests/unit/machine_manager.cpp +++ b/tests/unit/machine_manager.cpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -111,7 +111,7 @@ ShardMap TestShardMap() { template void TestScanAll(RequestRouter &request_router) { - auto result = request_router.ScanVertices(kLabelName); + auto result = request_router.ScanVertices(kLabelName, std::nullopt); EXPECT_EQ(result.size(), 2); } diff --git a/tests/unit/query_v2_expression_evaluator.cpp b/tests/unit/query_v2_expression_evaluator.cpp index 5e91d0d5a..fb875a0f2 100644 --- a/tests/unit/query_v2_expression_evaluator.cpp +++ b/tests/unit/query_v2_expression_evaluator.cpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -84,7 +84,10 @@ class MockedRequestRouter : public RequestRouterInterface { } void StartTransaction() override {} void Commit() override {} - std::vector ScanVertices(std::optional /* label */) override { return {}; } + std::vector ScanVertices(std::optional /* label */, + std::optional> /*primary_key*/) override { + return {}; + } std::vector CreateVertices( std::vector /* new_vertices */) override {