diff --git a/.github/workflows/diff.yaml b/.github/workflows/diff.yaml index a7faa7d22..321937956 100644 --- a/.github/workflows/diff.yaml +++ b/.github/workflows/diff.yaml @@ -271,3 +271,30 @@ jobs: source ve3/bin/activate cd e2e LD_LIBRARY_PATH=$LD_LIBRARY_PATH:../../libs/mgclient/lib python runner.py --workloads-root-directory ./distributed_queries + + - name: Run query performance tests + run: | + cd tests/manual + ./query_performance_runner.py + + - name: Get branch name (merge) + if: github.event_name != 'pull_request' + shell: bash + run: echo "BRANCH_NAME=$(echo ${GITHUB_REF#refs/heads/} | tr / -)" >> $GITHUB_ENV + + - name: Get branch name (pull request) + if: github.event_name == 'pull_request' + shell: bash + run: echo "BRANCH_NAME=$(echo ${GITHUB_HEAD_REF} | tr / -)" >> $GITHUB_ENV + + - name: Upload macro benchmark results + run: | + cd tools/bench-graph-client + virtualenv -p python3 ve3 + source ve3/bin/activate + pip install -r requirements.txt + ./main.py --benchmark-name "query_performance" \ + --benchmark-results-path "../../build/tests/manual/query_performance_benchmark/summary.json" \ + --github-run-id "${{ github.run_id }}" \ + --github-run-number "${{ github.run_number }}" \ + --head-branch-name "${{ env.BRANCH_NAME }}" diff --git a/src/expr/ast/cypher_main_visitor.hpp b/src/expr/ast/cypher_main_visitor.hpp index 17d2167b2..bbe5d2d06 100644 --- a/src/expr/ast/cypher_main_visitor.hpp +++ b/src/expr/ast/cypher_main_visitor.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -51,7 +51,7 @@ constexpr char kId[] = "ID"; namespace MG_INJECTED_NAMESPACE_NAME { namespace detail { -using antlropencypher::MemgraphCypher; +using antlropencypher::v2::MemgraphCypher; template std::optional> VisitMemoryLimit(MemgraphCypher::MemoryLimitContext *memory_limit_ctx, @@ -211,13 +211,13 @@ inline std::string_view ToString(const PulsarConfigKey key) { } } // namespace detail -using antlropencypher::MemgraphCypher; +using antlropencypher::v2::MemgraphCypher; struct ParsingContext { bool is_query_cached = false; }; -class CypherMainVisitor : public antlropencypher::MemgraphCypherBaseVisitor { +class CypherMainVisitor : public antlropencypher::v2::MemgraphCypherBaseVisitor { public: explicit CypherMainVisitor(ParsingContext context, AstStorage *storage) : context_(context), storage_(storage) {} diff --git a/src/io/future.hpp b/src/io/future.hpp index 585f18938..99906f2de 100644 --- a/src/io/future.hpp +++ b/src/io/future.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -120,7 +120,7 @@ class Shared { MG_ASSERT(!consumed_, "Promise filled after it was already consumed!"); MG_ASSERT(!filled_, "Promise filled twice!"); - item_ = item; + item_ = std::move(item); filled_ = true; } // lock released before condition variable notification @@ -235,7 +235,7 @@ class Promise { // Fill the expected item into the Future. void Fill(T item) { MG_ASSERT(!filled_or_moved_, "Promise::Fill called on a promise that is already filled or moved!"); - shared_->Fill(item); + shared_->Fill(std::move(item)); filled_or_moved_ = true; } diff --git a/src/io/local_transport/local_transport.hpp b/src/io/local_transport/local_transport.hpp index b64cabf1d..f17e5c059 100644 --- a/src/io/local_transport/local_transport.hpp +++ b/src/io/local_transport/local_transport.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -30,10 +30,10 @@ class LocalTransport { explicit LocalTransport(std::shared_ptr local_transport_handle) : local_transport_handle_(std::move(local_transport_handle)) {} - template - ResponseFuture Request(Address to_address, Address from_address, RequestT request, + template + ResponseFuture Request(Address to_address, Address from_address, RValueRef request, std::function fill_notifier, Duration timeout) { - return local_transport_handle_->template SubmitRequest( + return local_transport_handle_->template SubmitRequest( to_address, from_address, std::move(request), timeout, fill_notifier); } @@ -43,8 +43,8 @@ class LocalTransport { } template - void Send(Address to_address, Address from_address, RequestId request_id, M &&message) { - return local_transport_handle_->template Send(to_address, from_address, request_id, std::forward(message)); + void Send(Address to_address, Address from_address, RequestId request_id, RValueRef message) { + return local_transport_handle_->template Send(to_address, from_address, request_id, std::move(message)); } Time Now() const { return local_transport_handle_->Now(); } diff --git a/src/io/local_transport/local_transport_handle.hpp b/src/io/local_transport/local_transport_handle.hpp index 38538620f..cd19a3977 100644 --- a/src/io/local_transport/local_transport_handle.hpp +++ b/src/io/local_transport/local_transport_handle.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -104,10 +104,10 @@ class LocalTransportHandle { } template - void Send(Address to_address, Address from_address, RequestId request_id, M &&message) { + void Send(Address to_address, Address from_address, RequestId request_id, RValueRef message) { auto type_info = TypeInfoFor(message); - std::any message_any(std::forward(message)); + std::any message_any(std::move(message)); OpaqueMessage opaque_message{.to_address = to_address, .from_address = from_address, .request_id = request_id, @@ -138,14 +138,14 @@ class LocalTransportHandle { cv_.notify_all(); } - template - ResponseFuture SubmitRequest(Address to_address, Address from_address, RequestT &&request, + template + ResponseFuture SubmitRequest(Address to_address, Address from_address, RValueRef request, Duration timeout, std::function fill_notifier) { auto [future, promise] = memgraph::io::FuturePromisePairWithNotifications>( // set null notifier for when the Future::Wait is called nullptr, // set notifier for when Promise::Fill is called - std::forward>(fill_notifier)); + std::move(fill_notifier)); const bool port_matches = to_address.last_known_port == from_address.last_known_port; const bool ip_matches = to_address.last_known_ip == from_address.last_known_ip; @@ -168,7 +168,7 @@ class LocalTransportHandle { promises_.emplace(std::move(promise_key), std::move(dop)); } // lock dropped - Send(to_address, from_address, request_id, std::forward(request)); + Send(to_address, from_address, request_id, std::move(request)); return std::move(future); } diff --git a/src/io/rsm/raft.hpp b/src/io/rsm/raft.hpp index 07a51288d..3be15cc46 100644 --- a/src/io/rsm/raft.hpp +++ b/src/io/rsm/raft.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -246,7 +247,7 @@ to a CAS operation. template concept Rsm = requires(ReplicatedState state, WriteOperation w, ReadOperation r) { - { state.Read(r) } -> std::same_as; + { state.Read(std::move(r)) } -> std::same_as; { state.Apply(w) } -> std::same_as; }; @@ -402,7 +403,7 @@ class Raft { const PendingClientRequest client_request = std::move(leader.pending_client_requests.at(apply_index)); leader.pending_client_requests.erase(apply_index); - const WriteResponse resp{ + WriteResponse resp{ .success = true, .write_return = std::move(write_return), .raft_index = apply_index, @@ -554,7 +555,7 @@ class Raft { for (const auto &peer : peers_) { // request_id not necessary to set because it's not a Future-backed Request. static constexpr auto request_id = 0; - io_.template Send(peer, request_id, request); + io_.template Send(peer, request_id, VoteRequest{request}); outstanding_votes.insert(peer); } @@ -624,13 +625,12 @@ class Raft { MG_ASSERT(std::max(req.term, state_.term) == req.term); } - const VoteResponse res{ - .term = std::max(req.term, state_.term), - .committed_log_size = state_.committed_log_size, - .vote_granted = new_leader, - }; - - io_.Send(from_address, request_id, res); + io_.Send(from_address, request_id, + VoteResponse{ + .term = std::max(req.term, state_.term), + .committed_log_size = state_.committed_log_size, + .vote_granted = new_leader, + }); if (new_leader) { // become a follower @@ -718,6 +718,10 @@ class Raft { .log_size = state_.log.size(), }; + static_assert(std::is_trivially_copyable_v, + "This function copies this message, therefore it is important to be trivially copyable. Otherwise it " + "should be moved"); + if constexpr (std::is_same()) { MG_ASSERT(req.term != state_.term, "Multiple leaders are acting under the term ", req.term); } @@ -736,7 +740,7 @@ class Raft { // become follower of this leader, reply with our log status state_.term = req.term; - io_.Send(from_address, request_id, res); + io_.Send(from_address, request_id, AppendResponse{res}); Log("becoming Follower of Leader ", from_address.last_known_port, " at term ", req.term); return Follower{ @@ -747,7 +751,7 @@ class Raft { if (req.term < state_.term) { // nack this request from an old leader - io_.Send(from_address, request_id, res); + io_.Send(from_address, request_id, AppendResponse{res}); return std::nullopt; } @@ -808,7 +812,7 @@ class Raft { Log("returning log_size of ", res.log_size); - io_.Send(from_address, request_id, res); + io_.Send(from_address, request_id, AppendResponse{res}); return std::nullopt; } @@ -859,17 +863,17 @@ class Raft { auto type_info = TypeInfoFor(req); std::string demangled_name = boost::core::demangle(type_info.get().name()); Log("handling ReadOperation<" + demangled_name + ">"); - ReadOperation read_operation = req.operation; + ReadOperation &read_operation = req.operation; - ReadResponseValue read_return = replicated_state_.Read(read_operation); + ReadResponseValue read_return = replicated_state_.Read(std::move(read_operation)); - const ReadResponse resp{ + ReadResponse resp{ .success = true, .read_return = std::move(read_return), .retry_leader = std::nullopt, }; - io_.Send(from_address, request_id, resp); + io_.Send(from_address, request_id, std::move(resp)); return std::nullopt; } @@ -878,11 +882,11 @@ class Raft { std::optional Handle(Candidate & /* variable */, ReadRequest && /* variable */, RequestId request_id, Address from_address) { Log("received ReadOperation - not redirecting because no Leader is known"); - const ReadResponse res{ + ReadResponse res{ .success = false, }; - io_.Send(from_address, request_id, res); + io_.Send(from_address, request_id, std::move(res)); Cron(); @@ -894,12 +898,12 @@ class Raft { Address from_address) { Log("redirecting client to known Leader with port ", follower.leader_address.last_known_port); - const ReadResponse res{ + ReadResponse res{ .success = false, .retry_leader = follower.leader_address, }; - io_.Send(from_address, request_id, res); + io_.Send(from_address, request_id, std::move(res)); return std::nullopt; } @@ -913,12 +917,12 @@ class Raft { Address from_address) { Log("redirecting client to known Leader with port ", follower.leader_address.last_known_port); - const WriteResponse res{ + WriteResponse res{ .success = false, .retry_leader = follower.leader_address, }; - io_.Send(from_address, request_id, res); + io_.Send(from_address, request_id, std::move(res)); return std::nullopt; } @@ -927,11 +931,11 @@ class Raft { RequestId request_id, Address from_address) { Log("received WriteRequest - not redirecting because no Leader is known"); - const WriteResponse res{ + WriteResponse res{ .success = false, }; - io_.Send(from_address, request_id, res); + io_.Send(from_address, request_id, std::move(res)); Cron(); diff --git a/src/io/rsm/rsm_client.hpp b/src/io/rsm/rsm_client.hpp index 2b76b6399..bc5dc6a47 100644 --- a/src/io/rsm/rsm_client.hpp +++ b/src/io/rsm/rsm_client.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -95,7 +95,7 @@ class RsmClient { BasicResult SendWriteRequest(WriteRequestT req) { Notifier notifier; const ReadinessToken readiness_token{0}; - SendAsyncWriteRequest(req, notifier, readiness_token); + SendAsyncWriteRequest(std::move(req), notifier, readiness_token); auto poll_result = AwaitAsyncWriteRequest(readiness_token); while (!poll_result) { poll_result = AwaitAsyncWriteRequest(readiness_token); @@ -106,7 +106,7 @@ class RsmClient { BasicResult SendReadRequest(ReadRequestT req) { Notifier notifier; const ReadinessToken readiness_token{0}; - SendAsyncReadRequest(req, notifier, readiness_token); + SendAsyncReadRequest(std::move(req), notifier, readiness_token); auto poll_result = AwaitAsyncReadRequest(readiness_token); while (!poll_result) { poll_result = AwaitAsyncReadRequest(readiness_token); @@ -115,15 +115,15 @@ class RsmClient { } /// AsyncRead methods - void SendAsyncReadRequest(const ReadRequestT &req, Notifier notifier, ReadinessToken readiness_token) { + void SendAsyncReadRequest(ReadRequestT &&req, Notifier notifier, ReadinessToken readiness_token) { ReadRequest read_req = {.operation = req}; AsyncRequest> async_request{ .start_time = io_.Now(), .request = std::move(req), .notifier = notifier, - .future = io_.template RequestWithNotification, ReadResponse>( - leader_, read_req, notifier, readiness_token), + .future = io_.template RequestWithNotification, ReadRequest>( + leader_, std::move(read_req), notifier, readiness_token), }; async_reads_.emplace(readiness_token.GetId(), std::move(async_request)); @@ -134,8 +134,8 @@ class RsmClient { ReadRequest read_req = {.operation = async_request.request}; - async_request.future = io_.template RequestWithNotification, ReadResponse>( - leader_, read_req, async_request.notifier, readiness_token); + async_request.future = io_.template RequestWithNotification, ReadRequest>( + leader_, std::move(read_req), async_request.notifier, readiness_token); } std::optional> PollAsyncReadRequest(const ReadinessToken &readiness_token) { @@ -184,15 +184,15 @@ class RsmClient { } /// AsyncWrite methods - void SendAsyncWriteRequest(const WriteRequestT &req, Notifier notifier, ReadinessToken readiness_token) { + void SendAsyncWriteRequest(WriteRequestT &&req, Notifier notifier, ReadinessToken readiness_token) { WriteRequest write_req = {.operation = req}; AsyncRequest> async_request{ .start_time = io_.Now(), .request = std::move(req), .notifier = notifier, - .future = io_.template RequestWithNotification, WriteResponse>( - leader_, write_req, notifier, readiness_token), + .future = io_.template RequestWithNotification, WriteRequest>( + leader_, std::move(write_req), notifier, readiness_token), }; async_writes_.emplace(readiness_token.GetId(), std::move(async_request)); @@ -204,8 +204,8 @@ class RsmClient { WriteRequest write_req = {.operation = async_request.request}; async_request.future = - io_.template RequestWithNotification, WriteResponse>( - leader_, write_req, async_request.notifier, readiness_token); + io_.template RequestWithNotification, WriteRequest>( + leader_, std::move(write_req), async_request.notifier, readiness_token); } std::optional> PollAsyncWriteRequest(const ReadinessToken &readiness_token) { diff --git a/src/io/simulator/simulator_handle.hpp b/src/io/simulator/simulator_handle.hpp index 3fd9b4965..a34e93c66 100644 --- a/src/io/simulator/simulator_handle.hpp +++ b/src/io/simulator/simulator_handle.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -105,19 +105,19 @@ class SimulatorHandle { bool ShouldShutDown() const; - template - ResponseFuture SubmitRequest(Address to_address, Address from_address, Request &&request, Duration timeout, - std::function &&maybe_tick_simulator, - std::function &&fill_notifier) { + template + ResponseFuture SubmitRequest(Address to_address, Address from_address, RValueRef request, + Duration timeout, std::function &&maybe_tick_simulator, + std::function &&fill_notifier) { auto type_info = TypeInfoFor(request); std::string demangled_name = boost::core::demangle(type_info.get().name()); spdlog::trace("simulator sending request {} to {}", demangled_name, to_address); - auto [future, promise] = memgraph::io::FuturePromisePairWithNotifications>( + auto [future, promise] = memgraph::io::FuturePromisePairWithNotifications>( // set notifier for when the Future::Wait is called - std::forward>(maybe_tick_simulator), + std::move(maybe_tick_simulator), // set notifier for when Promise::Fill is called - std::forward>(fill_notifier)); + std::move(fill_notifier)); { std::unique_lock lock(mu_); @@ -126,7 +126,7 @@ class SimulatorHandle { const Time deadline = cluster_wide_time_microseconds_ + timeout; - std::any message(request); + std::any message(std::move(request)); OpaqueMessage om{.to_address = to_address, .from_address = from_address, .request_id = request_id, @@ -194,7 +194,7 @@ class SimulatorHandle { } template - void Send(Address to_address, Address from_address, RequestId request_id, M message) { + void Send(Address to_address, Address from_address, RequestId request_id, RValueRef message) { spdlog::trace("sending message from {} to {}", from_address.last_known_port, to_address.last_known_port); auto type_info = TypeInfoFor(message); { diff --git a/src/io/simulator/simulator_transport.hpp b/src/io/simulator/simulator_transport.hpp index 1272a04a1..05775e726 100644 --- a/src/io/simulator/simulator_transport.hpp +++ b/src/io/simulator/simulator_transport.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -33,14 +33,14 @@ class SimulatorTransport { SimulatorTransport(std::shared_ptr simulator_handle, Address address, uint64_t seed) : simulator_handle_(simulator_handle), address_(address), rng_(std::mt19937{seed}) {} - template - ResponseFuture Request(Address to_address, Address from_address, RequestT request, + template + ResponseFuture Request(Address to_address, Address from_address, RValueRef request, std::function notification, Duration timeout) { std::function tick_simulator = [handle_copy = simulator_handle_] { return handle_copy->MaybeTickSimulator(); }; - return simulator_handle_->template SubmitRequest( + return simulator_handle_->template SubmitRequest( to_address, from_address, std::move(request), timeout, std::move(tick_simulator), std::move(notification)); } @@ -50,8 +50,8 @@ class SimulatorTransport { } template - void Send(Address to_address, Address from_address, uint64_t request_id, M message) { - return simulator_handle_->template Send(to_address, from_address, request_id, message); + void Send(Address to_address, Address from_address, uint64_t request_id, RValueRef message) { + return simulator_handle_->template Send(to_address, from_address, request_id, std::move(message)); } Time Now() const { return simulator_handle_->Now(); } diff --git a/src/io/transport.hpp b/src/io/transport.hpp index 5dd7a9a39..eade62db3 100644 --- a/src/io/transport.hpp +++ b/src/io/transport.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -22,6 +22,7 @@ #include "io/message_histogram_collector.hpp" #include "io/notifier.hpp" #include "io/time.hpp" +#include "utils/concepts.hpp" #include "utils/result.hpp" namespace memgraph::io { @@ -32,7 +33,15 @@ using memgraph::utils::BasicResult; // reasonable constraints around message types over time, // as we adapt things to use Thrift-generated message types. template -concept Message = std::same_as>; +concept Message = std::movable && std::copyable; + +template +struct RValueRefEnforcer { + using Type = T &&; +}; + +template +using RValueRef = typename RValueRefEnforcer::Type; using RequestId = uint64_t; @@ -82,44 +91,44 @@ class Io { Duration GetDefaultTimeout() { return default_timeout_; } /// Issue a request with an explicit timeout in microseconds provided. This tends to be used by clients. - template - ResponseFuture RequestWithTimeout(Address address, RequestT request, Duration timeout) { + template + ResponseFuture RequestWithTimeout(Address address, RValueRef request, Duration timeout) { const Address from_address = address_; std::function fill_notifier = nullptr; - return implementation_.template Request(address, from_address, request, fill_notifier, - timeout); + return implementation_.template Request(address, from_address, std::move(request), + fill_notifier, timeout); } /// Issue a request that times out after the default timeout. This tends /// to be used by clients. - template - ResponseFuture Request(Address to_address, RequestT request) { + template + ResponseFuture Request(Address to_address, RValueRef request) { const Duration timeout = default_timeout_; const Address from_address = address_; std::function fill_notifier = nullptr; - return implementation_.template Request(to_address, from_address, std::move(request), + return implementation_.template Request(to_address, from_address, std::move(request), fill_notifier, timeout); } /// Issue a request that will notify a Notifier when it is filled or times out. - template - ResponseFuture RequestWithNotification(Address to_address, RequestT request, Notifier notifier, + template + ResponseFuture RequestWithNotification(Address to_address, RValueRef request, Notifier notifier, ReadinessToken readiness_token) { const Duration timeout = default_timeout_; const Address from_address = address_; std::function fill_notifier = [notifier, readiness_token]() { notifier.Notify(readiness_token); }; - return implementation_.template Request(to_address, from_address, std::move(request), + return implementation_.template Request(to_address, from_address, std::move(request), fill_notifier, timeout); } /// Issue a request that will notify a Notifier when it is filled or times out. - template - ResponseFuture RequestWithNotificationAndTimeout(Address to_address, RequestT request, Notifier notifier, + template + ResponseFuture RequestWithNotificationAndTimeout(Address to_address, RequestT &&request, Notifier notifier, ReadinessToken readiness_token, Duration timeout) { const Address from_address = address_; std::function fill_notifier = [notifier, readiness_token]() { notifier.Notify(readiness_token); }; - return implementation_.template Request(to_address, from_address, std::move(request), - fill_notifier, timeout); + return implementation_.template Request(to_address, from_address, std::forward(request), + fill_notifier, timeout); } /// Wait for an explicit number of microseconds for a request of one of the @@ -141,9 +150,9 @@ class Io { /// responses are not necessarily expected, and for servers to respond to requests. /// If you need reliable delivery, this must be built on-top. TCP is not enough for most use cases. template - void Send(Address to_address, RequestId request_id, M message) { + void Send(Address to_address, RequestId request_id, M &&message) { Address from_address = address_; - return implementation_.template Send(to_address, from_address, request_id, std::move(message)); + return implementation_.template Send(to_address, from_address, request_id, std::forward(message)); } /// The current system time. This time source should be preferred over any other, diff --git a/src/parser/CMakeLists.txt b/src/parser/CMakeLists.txt index 7575b0529..b7ae43178 100644 --- a/src/parser/CMakeLists.txt +++ b/src/parser/CMakeLists.txt @@ -23,7 +23,7 @@ add_custom_command( COMMAND ${CMAKE_COMMAND} -E make_directory ${opencypher_generated} COMMAND java -jar ${CMAKE_SOURCE_DIR}/libs/antlr-4.10.1-complete.jar - -Dlanguage=Cpp -visitor -package antlropencypher + -Dlanguage=Cpp -visitor -package antlropencypher::v2 -o ${opencypher_generated} ${opencypher_lexer_grammar} ${opencypher_parser_grammar} WORKING_DIRECTORY "${CMAKE_BINARY_DIR}" diff --git a/src/parser/opencypher/parser.hpp b/src/parser/opencypher/parser.hpp index 9a57bc65b..25fdabf4b 100644 --- a/src/parser/opencypher/parser.hpp +++ b/src/parser/opencypher/parser.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -14,10 +14,10 @@ #include #include "antlr4-runtime.h" -#include "utils/exceptions.hpp" #include "parser/opencypher/generated/MemgraphCypher.h" #include "parser/opencypher/generated/MemgraphCypherLexer.h" #include "utils/concepts.hpp" +#include "utils/exceptions.hpp" namespace memgraph::frontend::opencypher { @@ -32,11 +32,9 @@ class SyntaxException : public utils::BasicException { * This thing must me a class since parser.cypher() returns pointer and there is * no way for us to get ownership over the object. */ -enum class ParserOpTag : uint8_t { - CYPHER, EXPRESSION -}; +enum class ParserOpTag : uint8_t { CYPHER, EXPRESSION }; -template +template class Parser { public: /** @@ -46,10 +44,9 @@ class Parser { Parser(const std::string query) : query_(std::move(query)) { parser_.removeErrorListeners(); parser_.addErrorListener(&error_listener_); - if constexpr(Tag == ParserOpTag::CYPHER) { + if constexpr (Tag == ParserOpTag::CYPHER) { tree_ = parser_.cypher(); - } - else { + } else { tree_ = parser_.expression(); } if (parser_.getNumberOfSyntaxErrors()) { @@ -75,11 +72,11 @@ class Parser { FirstMessageErrorListener error_listener_; std::string query_; antlr4::ANTLRInputStream input_{query_}; - antlropencypher::MemgraphCypherLexer lexer_{&input_}; + antlropencypher::v2::MemgraphCypherLexer lexer_{&input_}; antlr4::CommonTokenStream tokens_{&lexer_}; // generate ast - antlropencypher::MemgraphCypher parser_{&tokens_}; + antlropencypher::v2::MemgraphCypher parser_{&tokens_}; antlr4::tree::ParseTree *tree_ = nullptr; }; } // namespace memgraph::frontend::opencypher diff --git a/src/query/CMakeLists.txt b/src/query/CMakeLists.txt index 0303f2fa5..d5545cd95 100644 --- a/src/query/CMakeLists.txt +++ b/src/query/CMakeLists.txt @@ -48,18 +48,20 @@ add_dependencies(mg-query generate_lcp_query) target_include_directories(mg-query PUBLIC ${CMAKE_SOURCE_DIR}/include) target_link_libraries(mg-query dl cppitertools Boost::headers) target_link_libraries(mg-query mg-integrations-pulsar mg-integrations-kafka mg-storage-v2 mg-license mg-utils mg-kvstore mg-memory) + if(NOT "${MG_PYTHON_PATH}" STREQUAL "") set(Python3_ROOT_DIR "${MG_PYTHON_PATH}") endif() + if("${MG_PYTHON_VERSION}" STREQUAL "") find_package(Python3 3.5 REQUIRED COMPONENTS Development) else() find_package(Python3 "${MG_PYTHON_VERSION}" EXACT REQUIRED COMPONENTS Development) endif() + target_link_libraries(mg-query Python3::Python) # Generate Antlr openCypher parser - set(opencypher_frontend ${CMAKE_CURRENT_SOURCE_DIR}/frontend/opencypher) set(opencypher_generated ${opencypher_frontend}/generated) set(opencypher_lexer_grammar ${opencypher_frontend}/grammar/MemgraphCypherLexer.g4) @@ -82,15 +84,15 @@ add_custom_command( OUTPUT ${antlr_opencypher_generated_src} ${antlr_opencypher_generated_include} COMMAND ${CMAKE_COMMAND} -E make_directory ${opencypher_generated} COMMAND - java -jar ${CMAKE_SOURCE_DIR}/libs/antlr-4.10.1-complete.jar - -Dlanguage=Cpp -visitor -package antlropencypher - -o ${opencypher_generated} - ${opencypher_lexer_grammar} ${opencypher_parser_grammar} + java -jar ${CMAKE_SOURCE_DIR}/libs/antlr-4.10.1-complete.jar + -Dlanguage=Cpp -visitor -package antlropencypher + -o ${opencypher_generated} + ${opencypher_lexer_grammar} ${opencypher_parser_grammar} WORKING_DIRECTORY "${CMAKE_BINARY_DIR}" DEPENDS - ${opencypher_lexer_grammar} ${opencypher_parser_grammar} - ${opencypher_frontend}/grammar/CypherLexer.g4 - ${opencypher_frontend}/grammar/Cypher.g4) + ${opencypher_lexer_grammar} ${opencypher_parser_grammar} + ${opencypher_frontend}/grammar/CypherLexer.g4 + ${opencypher_frontend}/grammar/Cypher.g4) add_custom_target(generate_opencypher_parser DEPENDS ${antlr_opencypher_generated_src} ${antlr_opencypher_generated_include}) diff --git a/src/query/v2/cypher_query_interpreter.cpp b/src/query/v2/cypher_query_interpreter.cpp index f3f8e48d7..6ac6b36fa 100644 --- a/src/query/v2/cypher_query_interpreter.cpp +++ b/src/query/v2/cypher_query_interpreter.cpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -14,9 +14,9 @@ #include "query/v2/request_router.hpp" // NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_HIDDEN_bool(query_cost_planner, true, "Use the cost-estimating query planner."); +DEFINE_HIDDEN_bool(query_v2_cost_planner, true, "Use the cost-estimating query planner."); // NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables) -DEFINE_VALIDATED_int32(query_plan_cache_ttl, 60, "Time to live for cached query plans, in seconds.", +DEFINE_VALIDATED_int32(query_v2_plan_cache_ttl, 60, "Time to live for cached query plans, in seconds.", FLAG_IN_RANGE(0, std::numeric_limits::max())); namespace memgraph::query::v2 { @@ -123,7 +123,7 @@ std::unique_ptr MakeLogicalPlan(AstStorage ast_storage, CypherQuery auto vertex_counts = plan::MakeVertexCountCache(request_router); auto symbol_table = expr::MakeSymbolTable(query, predefined_identifiers); auto planning_context = plan::MakePlanningContext(&ast_storage, &symbol_table, query, &vertex_counts); - auto [root, cost] = plan::MakeLogicalPlan(&planning_context, parameters, FLAGS_query_cost_planner); + auto [root, cost] = plan::MakeLogicalPlan(&planning_context, parameters, FLAGS_query_v2_cost_planner); return std::make_unique(std::move(root), cost, std::move(ast_storage), std::move(symbol_table)); } diff --git a/src/query/v2/cypher_query_interpreter.hpp b/src/query/v2/cypher_query_interpreter.hpp index 688e52fed..18505820f 100644 --- a/src/query/v2/cypher_query_interpreter.hpp +++ b/src/query/v2/cypher_query_interpreter.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -22,9 +22,9 @@ #include "utils/timer.hpp" // NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables) -DECLARE_bool(query_cost_planner); +DECLARE_bool(query_v2_cost_planner); // NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables) -DECLARE_int32(query_plan_cache_ttl); +DECLARE_int32(query_v2_plan_cache_ttl); namespace memgraph::query::v2 { @@ -58,7 +58,7 @@ class CachedPlan { bool IsExpired() const { // NOLINTNEXTLINE (modernize-use-nullptr) - return cache_timer_.Elapsed() > std::chrono::seconds(FLAGS_query_plan_cache_ttl); + return cache_timer_.Elapsed() > std::chrono::seconds(FLAGS_query_v2_plan_cache_ttl); }; private: diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp index 993d24282..39ccf51ef 100644 --- a/src/query/v2/plan/operator.cpp +++ b/src/query/v2/plan/operator.cpp @@ -3087,25 +3087,18 @@ class DistributedExpandCursor : public Cursor { MG_ASSERT(direction != EdgeAtom::Direction::BOTH); const auto &edge = frame[self_.common_.edge_symbol].ValueEdge(); static constexpr auto get_dst_vertex = [](const EdgeAccessor &edge, - const EdgeAtom::Direction direction) -> msgs::VertexId { + const EdgeAtom::Direction direction) -> accessors::VertexAccessor { switch (direction) { case EdgeAtom::Direction::IN: - return edge.From().Id(); + return edge.From(); case EdgeAtom::Direction::OUT: - return edge.To().Id(); + return edge.To(); case EdgeAtom::Direction::BOTH: throw std::runtime_error("EdgeDirection Both not implemented"); } }; - msgs::GetPropertiesRequest request; - // to not fetch any properties of the edges - request.vertex_ids.push_back(get_dst_vertex(edge, direction)); - auto result_rows = context.request_router->GetProperties(std::move(request)); - MG_ASSERT(result_rows.size() == 1); - auto &result_row = result_rows.front(); - frame[self_.common_.node_symbol] = - accessors::VertexAccessor(msgs::Vertex{result_row.vertex}, result_row.props, context.request_router); + frame[self_.common_.node_symbol] = get_dst_vertex(edge, direction); } bool InitEdges(Frame &frame, ExecutionContext &context) { @@ -3129,6 +3122,8 @@ class DistributedExpandCursor : public Cursor { // to not fetch any properties of the edges request.edge_properties.emplace(); request.src_vertices.push_back(vertex.Id()); + request.edge_properties.emplace(); + request.src_vertex_properties.emplace(); auto result_rows = std::invoke([&context, &request]() mutable { SCOPED_REQUEST_WAIT_PROFILE; return context.request_router->ExpandOne(std::move(request)); @@ -3271,6 +3266,7 @@ class DistributedExpandCursor : public Cursor { [](const storage::v3::EdgeTypeId edge_type_id) { return msgs::EdgeType{edge_type_id}; }); // to not fetch any properties of the edges request.edge_properties.emplace(); + request.src_vertex_properties.emplace(); for (const auto &frame : own_multi_frame_->GetValidFramesReader()) { const auto &vertex_value = frame[self_.input_symbol_]; diff --git a/src/query/v2/plan/rewrite/index_lookup.cpp b/src/query/v2/plan/rewrite/index_lookup.cpp index 795f15fa4..b12a26da1 100644 --- a/src/query/v2/plan/rewrite/index_lookup.cpp +++ b/src/query/v2/plan/rewrite/index_lookup.cpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -13,7 +13,8 @@ #include "utils/flag_validation.hpp" -DEFINE_VALIDATED_HIDDEN_int64(query_vertex_count_to_expand_existing, 10, +// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_VALIDATED_HIDDEN_int64(query_v2_vertex_count_to_expand_existing, 10, "Maximum count of indexed vertices which provoke " "indexed lookup and then expand to existing, instead of " "a regular expand. Default is 10, to turn off use -1.", diff --git a/src/query/v2/plan/rewrite/index_lookup.hpp b/src/query/v2/plan/rewrite/index_lookup.hpp index 0b9b9cb97..7f5b601e5 100644 --- a/src/query/v2/plan/rewrite/index_lookup.hpp +++ b/src/query/v2/plan/rewrite/index_lookup.hpp @@ -30,7 +30,8 @@ #include "query/v2/plan/preprocess.hpp" #include "storage/v3/id_types.hpp" -DECLARE_int64(query_vertex_count_to_expand_existing); +// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_int64(query_v2_vertex_count_to_expand_existing); namespace memgraph::query::v2::plan { @@ -100,7 +101,7 @@ class IndexLookupRewriter final : public HierarchicalLogicalOperatorVisitor { return true; } ScanAll dst_scan(expand.input(), expand.common_.node_symbol, expand.view_); - auto indexed_scan = GenScanByIndex(dst_scan, FLAGS_query_vertex_count_to_expand_existing); + auto indexed_scan = GenScanByIndex(dst_scan, FLAGS_query_v2_vertex_count_to_expand_existing); if (indexed_scan) { expand.set_input(std::move(indexed_scan)); expand.common_.existing_node = true; @@ -129,7 +130,7 @@ class IndexLookupRewriter final : public HierarchicalLogicalOperatorVisitor { // unconditionally creating an indexed scan. indexed_scan = GenScanByIndex(dst_scan); } else { - indexed_scan = GenScanByIndex(dst_scan, FLAGS_query_vertex_count_to_expand_existing); + indexed_scan = GenScanByIndex(dst_scan, FLAGS_query_v2_vertex_count_to_expand_existing); } if (indexed_scan) { expand.set_input(std::move(indexed_scan)); diff --git a/src/query/v2/plan/variable_start_planner.cpp b/src/query/v2/plan/variable_start_planner.cpp index b6d15f73d..ec23be1af 100644 --- a/src/query/v2/plan/variable_start_planner.cpp +++ b/src/query/v2/plan/variable_start_planner.cpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -17,7 +17,8 @@ #include "utils/flag_validation.hpp" #include "utils/logging.hpp" -DEFINE_VALIDATED_HIDDEN_uint64(query_max_plans, 1000U, "Maximum number of generated plans for a query.", +// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_VALIDATED_HIDDEN_uint64(query_v2_max_plans, 1000U, "Maximum number of generated plans for a query.", FLAG_IN_RANGE(1, std::numeric_limits::max())); namespace memgraph::query::v2::plan::impl { diff --git a/src/query/v2/plan/variable_start_planner.hpp b/src/query/v2/plan/variable_start_planner.hpp index 27722b6b2..01d7cef58 100644 --- a/src/query/v2/plan/variable_start_planner.hpp +++ b/src/query/v2/plan/variable_start_planner.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -18,7 +18,8 @@ #include "query/v2/plan/rule_based_planner.hpp" -DECLARE_uint64(query_max_plans); +// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_uint64(query_v2_max_plans); namespace memgraph::query::v2::plan { @@ -310,7 +311,7 @@ class VariableStartPlanner { for (const auto &query_part : query_parts) { alternative_query_parts.emplace_back(impl::VaryQueryPartMatching(query_part, symbol_table)); } - return iter::slice(MakeCartesianProduct(std::move(alternative_query_parts)), 0UL, FLAGS_query_max_plans); + return iter::slice(MakeCartesianProduct(std::move(alternative_query_parts)), 0UL, FLAGS_query_v2_max_plans); } public: diff --git a/src/query/v2/request_router.hpp b/src/query/v2/request_router.hpp index a8326d900..04107a73e 100644 --- a/src/query/v2/request_router.hpp +++ b/src/query/v2/request_router.hpp @@ -323,8 +323,8 @@ class RequestRouter : public RequestRouterInterface { io::ReadinessToken readiness_token{i}; auto &storage_client = GetStorageClientForShard(request.shard); msgs::WriteRequests req = request.request; - storage_client.SendAsyncWriteRequest(req, notifier_, readiness_token); - running_requests.emplace(readiness_token.GetId(), request); + storage_client.SendAsyncWriteRequest(std::move(req), notifier_, readiness_token); + running_requests.emplace(readiness_token.GetId(), std::move(request)); } // drive requests to completion @@ -339,7 +339,8 @@ class RequestRouter : public RequestRouterInterface { // must be fetched again with an ExpandOne(Edges.dst) // create requests - std::vector> requests_to_be_sent = RequestsForExpandOne(request); + std::vector> requests_to_be_sent = + RequestsForExpandOne(std::move(request)); // begin all requests in parallel RunningRequests running_requests = {}; @@ -349,8 +350,8 @@ class RequestRouter : public RequestRouterInterface { io::ReadinessToken readiness_token{i}; auto &storage_client = GetStorageClientForShard(request.shard); msgs::ReadRequests req = request.request; - storage_client.SendAsyncReadRequest(req, notifier_, readiness_token); - running_requests.emplace(readiness_token.GetId(), request); + storage_client.SendAsyncReadRequest(std::move(req), notifier_, readiness_token); + running_requests.emplace(readiness_token.GetId(), std::move(request)); } // drive requests to completion @@ -386,8 +387,8 @@ class RequestRouter : public RequestRouterInterface { io::ReadinessToken readiness_token{i}; auto &storage_client = GetStorageClientForShard(request.shard); msgs::ReadRequests req = request.request; - storage_client.SendAsyncReadRequest(req, notifier_, readiness_token); - running_requests.emplace(readiness_token.GetId(), request); + storage_client.SendAsyncReadRequest(std::move(req), notifier_, readiness_token); + running_requests.emplace(readiness_token.GetId(), std::move(request)); } // drive requests to completion @@ -503,6 +504,7 @@ class RequestRouter : public RequestRouterInterface { msgs::ScanVerticesRequest request; request.transaction_id = transaction_id_; + request.props_to_return.emplace(); request.start_id.second = storage::conversions::ConvertValueVector(key); ShardRequestState shard_request_state{ @@ -517,7 +519,7 @@ class RequestRouter : public RequestRouterInterface { return requests; } - std::vector> RequestsForExpandOne(const msgs::ExpandOneRequest &request) { + std::vector> RequestsForExpandOne(msgs::ExpandOneRequest &&request) { std::map per_shard_request_table; msgs::ExpandOneRequest top_level_rqst_template = request; top_level_rqst_template.transaction_id = transaction_id_; @@ -529,7 +531,7 @@ class RequestRouter : public RequestRouterInterface { if (!per_shard_request_table.contains(shard)) { per_shard_request_table.insert(std::pair(shard, top_level_rqst_template)); } - per_shard_request_table[shard].src_vertices.push_back(vertex); + per_shard_request_table[shard].src_vertices.push_back(std::move(vertex)); } std::vector> requests = {}; @@ -726,11 +728,11 @@ class RequestRouter : public RequestRouterInterface { coordinator::CoordinatorWriteRequests requests{coordinator::AllocateEdgeIdBatchRequest{.batch_size = 1000000}}; io::rsm::WriteRequest ww; - ww.operation = requests; - auto resp = - io_.template Request, - io::rsm::WriteResponse>(coordinator_address, ww) - .Wait(); + ww.operation = std::move(requests); + auto resp = io_.template Request, + io::rsm::WriteRequest>(coordinator_address, + std::move(ww)) + .Wait(); if (resp.HasValue()) { const auto alloc_edge_id_reps = std::get(resp.GetValue().message.write_return); diff --git a/src/storage/v3/expr.cpp b/src/storage/v3/expr.cpp index 36c0ff8e3..185e4e29e 100644 --- a/src/storage/v3/expr.cpp +++ b/src/storage/v3/expr.cpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -35,7 +35,7 @@ msgs::Value ConstructValueVertex(const VertexAccessor &acc, View view) { memgraph::msgs::Label value_label{.id = prim_label}; auto prim_key = conversions::ConvertValueVector(acc.PrimaryKey(view).GetValue()); - memgraph::msgs::VertexId vertex_id = std::make_pair(value_label, prim_key); + memgraph::msgs::VertexId vertex_id = std::make_pair(value_label, std::move(prim_key)); // Get the labels auto vertex_labels = acc.Labels(view).GetValue(); @@ -45,7 +45,7 @@ msgs::Value ConstructValueVertex(const VertexAccessor &acc, View view) { std::transform(vertex_labels.begin(), vertex_labels.end(), std::back_inserter(value_labels), [](const auto &label) { return msgs::Label{.id = label}; }); - return msgs::Value({.id = vertex_id, .labels = value_labels}); + return msgs::Value({.id = std::move(vertex_id), .labels = std::move(value_labels)}); } msgs::Value ConstructValueEdge(const EdgeAccessor &acc, View view) { diff --git a/src/storage/v3/request_helper.cpp b/src/storage/v3/request_helper.cpp index f13c5a82e..8e20d8e58 100644 --- a/src/storage/v3/request_helper.cpp +++ b/src/storage/v3/request_helper.cpp @@ -46,7 +46,6 @@ struct VertexIdCmpr { std::optional> PrimaryKeysFromAccessor(const VertexAccessor &acc, View view, const Schemas::Schema &schema) { std::map ret; - auto props = acc.Properties(view); auto maybe_pk = acc.PrimaryKey(view); if (maybe_pk.HasError()) { spdlog::debug("Encountered an error while trying to get vertex primary key."); @@ -58,7 +57,7 @@ std::optional> PrimaryKeysFromAccessor(const VertexA ret.emplace(schema.second[i].property_id, FromPropertyValueToValue(std::move(pk[i]))); } - return ret; + return {std::move(ret)}; } ShardResult> FillUpSourceVertexSecondaryLabels(const std::optional &v_acc, @@ -99,7 +98,7 @@ ShardResult> FillUpSourceVertexProperties(const std: } auto pks = PrimaryKeysFromAccessor(*v_acc, view, schema); if (pks) { - src_vertex_properties.merge(*pks); + src_vertex_properties.merge(std::move(*pks)); } } else if (req.src_vertex_properties.value().empty()) { @@ -384,13 +383,10 @@ bool FilterOnEdge(DbAccessor &dba, const storage::v3::VertexAccessor &v_acc, con } ShardResult GetExpandOneResult( - Shard::Accessor &acc, msgs::VertexId src_vertex, const msgs::ExpandOneRequest &req, + VertexAccessor v_acc, msgs::VertexId src_vertex, const msgs::ExpandOneRequest &req, const EdgeUniquenessFunction &maybe_filter_based_on_edge_uniqueness, const EdgeFiller &edge_filler, const Schemas::Schema &schema) { /// Fill up source vertex - const auto primary_key = ConvertPropertyVector(src_vertex.second); - auto v_acc = acc.FindVertex(primary_key, View::NEW); - msgs::Vertex source_vertex = {.id = src_vertex}; auto maybe_secondary_labels = FillUpSourceVertexSecondaryLabels(v_acc, req); if (maybe_secondary_labels.HasError()) { diff --git a/src/storage/v3/request_helper.hpp b/src/storage/v3/request_helper.hpp index bbe4894e9..37391cb3d 100644 --- a/src/storage/v3/request_helper.hpp +++ b/src/storage/v3/request_helper.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -233,7 +233,7 @@ ShardResult> CollectAllPropertiesImpl(const TAccesso [](std::pair &pair) { return std::make_pair(pair.first, conversions::FromPropertyValueToValue(std::move(pair.second))); }); - return ret; + return {std::move(ret)}; } } // namespace impl @@ -247,7 +247,7 @@ EdgeUniquenessFunction InitializeEdgeUniquenessFunction(bool only_unique_neighbo EdgeFiller InitializeEdgeFillerFunction(const msgs::ExpandOneRequest &req); ShardResult GetExpandOneResult( - Shard::Accessor &acc, msgs::VertexId src_vertex, const msgs::ExpandOneRequest &req, + VertexAccessor v_acc, msgs::VertexId src_vertex, const msgs::ExpandOneRequest &req, const EdgeUniquenessFunction &maybe_filter_based_on_edge_uniqueness, const EdgeFiller &edge_filler, const Schemas::Schema &schema); diff --git a/src/storage/v3/shard_manager.hpp b/src/storage/v3/shard_manager.hpp index 74ec8d2d1..ae7a5f4ef 100644 --- a/src/storage/v3/shard_manager.hpp +++ b/src/storage/v3/shard_manager.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -250,8 +250,8 @@ class ShardManager { spdlog::info("SM sending heartbeat to coordinator {}", coordinator_leader_.ToString()); heartbeat_res_.emplace(std::move( - io_.template Request, WriteResponse>( - coordinator_leader_, ww))); + io_.template Request, WriteRequest>( + coordinator_leader_, std::move(ww)))); spdlog::info("SM sent heartbeat"); } diff --git a/src/storage/v3/shard_rsm.cpp b/src/storage/v3/shard_rsm.cpp index 881796a70..6f71c7269 100644 --- a/src/storage/v3/shard_rsm.cpp +++ b/src/storage/v3/shard_rsm.cpp @@ -472,7 +472,8 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::ExpandOneRequest &&req) { if (req.order_by_edges.empty()) { const auto *schema = shard_->GetSchema(shard_->PrimaryLabel()); MG_ASSERT(schema); - return GetExpandOneResult(acc, src_vertex, req, maybe_filter_based_on_edge_uniqueness, edge_filler, *schema); + return GetExpandOneResult(src_vertex_acc, std::move(src_vertex), req, maybe_filter_based_on_edge_uniqueness, + edge_filler, *schema); } auto [in_edge_accessors, out_edge_accessors] = GetEdgesFromVertex(src_vertex_acc, req.direction); const auto in_ordered_edges = OrderByEdges(dba, in_edge_accessors, req.order_by_edges, src_vertex_acc); @@ -487,12 +488,13 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::ExpandOneRequest &&req) { [](const auto &edge_element) { return edge_element.object_acc; }); const auto *schema = shard_->GetSchema(shard_->PrimaryLabel()); MG_ASSERT(schema); - return GetExpandOneResult(src_vertex_acc, src_vertex, req, in_edge_ordered_accessors, out_edge_ordered_accessors, - maybe_filter_based_on_edge_uniqueness, edge_filler, *schema); + return GetExpandOneResult(src_vertex_acc, std::move(src_vertex), req, std::move(in_edge_ordered_accessors), + std::move(out_edge_ordered_accessors), maybe_filter_based_on_edge_uniqueness, + edge_filler, *schema); }); if (maybe_result.HasError()) { - shard_error.emplace(CreateErrorResponse(primary_key.GetError(), req.transaction_id, "getting primary key")); + shard_error.emplace(CreateErrorResponse(maybe_result.GetError(), req.transaction_id, "getting expand result")); break; } @@ -581,12 +583,12 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::GetPropertiesRequest &&req) { if (maybe_id.HasError()) { return {maybe_id.GetError()}; } - const auto &id = maybe_id.GetValue(); + auto &vertex_id = maybe_id.GetValue(); std::optional e_id; if (e_acc) { e_id = msgs::EdgeId{e_acc->Gid().AsUint()}; } - msgs::VertexId v_id{msgs::Label{id.primary_label}, ConvertValueVector(id.primary_key)}; + msgs::VertexId v_id{msgs::Label{vertex_id.primary_label}, ConvertValueVector(std::move(vertex_id.primary_key))}; auto maybe_props = collect_props(v_acc, e_acc); if (maybe_props.HasError()) { return {maybe_props.GetError()}; diff --git a/src/storage/v3/shard_rsm.hpp b/src/storage/v3/shard_rsm.hpp index 8a5533c3e..9c178eccc 100644 --- a/src/storage/v3/shard_rsm.hpp +++ b/src/storage/v3/shard_rsm.hpp @@ -57,7 +57,7 @@ class ShardRsm { } // NOLINTNEXTLINE(readability-convert-member-functions-to-static) - msgs::ReadResponses Read(msgs::ReadRequests requests) { + msgs::ReadResponses Read(msgs::ReadRequests &&requests) { return std::visit([&](auto &&request) mutable { return HandleRead(std::forward(request)); }, std::move(requests)); } diff --git a/src/storage/v3/shard_worker.hpp b/src/storage/v3/shard_worker.hpp index 547aa0a6f..121c4be70 100644 --- a/src/storage/v3/shard_worker.hpp +++ b/src/storage/v3/shard_worker.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -100,7 +100,7 @@ class Queue { inner_->submitted++; - inner_->queue.emplace_back(std::forward(message)); + inner_->queue.emplace_back(std::move(message)); } // lock dropped before notifying condition variable inner_->cv.notify_all(); diff --git a/src/storage/v3/value_conversions.hpp b/src/storage/v3/value_conversions.hpp index 53374e1ed..c068378ed 100644 --- a/src/storage/v3/value_conversions.hpp +++ b/src/storage/v3/value_conversions.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -126,6 +126,17 @@ inline std::vector ConvertValueVector(const std::vector ConvertValueVector(std::vector &&vec) { + std::vector ret; + ret.reserve(vec.size()); + + for (auto &&elem : vec) { + ret.push_back(FromPropertyValueToValue(std::move(elem))); + } + + return ret; +} + inline msgs::VertexId ToMsgsVertexId(const v3::VertexId &vertex_id) { return {msgs::Label{vertex_id.primary_label}, ConvertValueVector(vertex_id.primary_key)}; } diff --git a/src/utils/concepts.hpp b/src/utils/concepts.hpp index 7c1f3a9c8..9ffc04642 100644 --- a/src/utils/concepts.hpp +++ b/src/utils/concepts.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -12,6 +12,7 @@ #pragma once #include #include +#include namespace memgraph::utils { template @@ -34,4 +35,7 @@ template concept Dereferenceable = requires(T t) { { *t } -> CanReference; }; + +template +concept Object = std::is_object_v; } // namespace memgraph::utils diff --git a/tests/e2e/distributed_queries/awesome_memgraph_functions.py b/tests/e2e/distributed_queries/awesome_memgraph_functions.py index 0bdaa07a4..0babec2d3 100644 --- a/tests/e2e/distributed_queries/awesome_memgraph_functions.py +++ b/tests/e2e/distributed_queries/awesome_memgraph_functions.py @@ -36,7 +36,9 @@ def test_awesome_memgraph_functions(connection): assert len(results) == 1 assert results[0][0] == 5 - results = execute_and_fetch_all(cursor, "MATCH (n) WITH COLLECT(n.property) as nn RETURN ALL(i IN nn WHERE i > 0)") + results = execute_and_fetch_all( + cursor, "UNWIND [2, 1, 3] AS value WITH COLLECT(value) as nn RETURN ALL(i IN nn WHERE i > 0)" + ) assert len(results) == 1 assert results[0][0] == True diff --git a/tests/e2e/distributed_queries/distinct.py b/tests/e2e/distributed_queries/distinct.py index 9ebe50e6f..4a8560b03 100644 --- a/tests/e2e/distributed_queries/distinct.py +++ b/tests/e2e/distributed_queries/distinct.py @@ -9,11 +9,13 @@ # by the Apache License, Version 2.0, included in the file # licenses/APL.txt. -import typing -import mgclient import sys -import pytest import time +import typing + +import mgclient +import pytest + from common import * @@ -30,8 +32,7 @@ def test_distinct(connection): assert len(results) == 2 for i, n in enumerate(results): n_props = n[0].properties - assert len(n_props) == 1 - assert n_props["property"] == i + assert len(n_props) == 0 if __name__ == "__main__": diff --git a/tests/e2e/distributed_queries/distributed_expand_one.py b/tests/e2e/distributed_queries/distributed_expand_one.py index 9d3d81074..1a74daeb1 100644 --- a/tests/e2e/distributed_queries/distributed_expand_one.py +++ b/tests/e2e/distributed_queries/distributed_expand_one.py @@ -13,7 +13,12 @@ import sys import pytest -from common import connection, execute_and_fetch_all, has_n_result_row, wait_for_shard_manager_to_initialize +from common import ( + connection, + execute_and_fetch_all, + has_n_result_row, + wait_for_shard_manager_to_initialize, +) def test_sequenced_expand_one(connection): @@ -22,15 +27,21 @@ def test_sequenced_expand_one(connection): for i in range(1, 4): assert has_n_result_row(cursor, f"CREATE (:label {{property:{i}}})", 0), f"Failed creating node" - assert has_n_result_row(cursor, "MATCH (n {property:1}), (m {property:2}) CREATE (n)-[:TO]->(m)", 0) - assert has_n_result_row(cursor, "MATCH (n {property:2}), (m {property:3}) CREATE (n)-[:TO]->(m)", 0) + assert has_n_result_row(cursor, "MATCH (n:label {property:1}), (m:label {property:2}) CREATE (n)-[:TO]->(m)", 0) + assert has_n_result_row(cursor, "MATCH (n:label {property:2}), (m:label {property:3}) CREATE (n)-[:TO]->(m)", 0) results = execute_and_fetch_all(cursor, "MATCH (n)-[:TO]->(m)-[:TO]->(l) RETURN n,m,l") assert len(results) == 1 n, m, l = results[0] - assert n.properties["property"] == 1 - assert m.properties["property"] == 2 - assert l.properties["property"] == 3 + assert ( + len(n.properties) == 0 + ), "we don't return any properties of the node received from expansion and the bolt layer doesn't serialize the primary key of vertices" + assert ( + len(m.properties) == 0 + ), "we don't return any properties of the node received from expansion and the bolt layer doesn't serialize the primary key of vertices" + assert ( + len(l.properties) == 0 + ), "we don't return any properties of the node received from expansion and the bolt layer doesn't serialize the primary key of vertices" if __name__ == "__main__": diff --git a/tests/e2e/distributed_queries/distributed_queries.py b/tests/e2e/distributed_queries/distributed_queries.py index b4b6324d9..460aa517c 100644 --- a/tests/e2e/distributed_queries/distributed_queries.py +++ b/tests/e2e/distributed_queries/distributed_queries.py @@ -9,11 +9,13 @@ # by the Apache License, Version 2.0, included in the file # licenses/APL.txt. -import typing -import mgclient import sys -import pytest import time +import typing + +import mgclient +import pytest + from common import * @@ -35,13 +37,13 @@ def test_vertex_creation_and_scanall(connection): assert len(results) == 9 for (n, r, m) in results: n_props = n.properties - assert len(n_props) == 1, "n is not expected to have properties, update the test!" + assert len(n_props) == 0, "n is not expected to have properties, update the test!" assert len(n.labels) == 0, "n is not expected to have labels, update the test!" assert r.type == "TO" m_props = m.properties - assert m_props["property"] <= 3 and m_props["property"] >= 0, "Wrong key" + assert len(m_props) == 0, "n is not expected to have properties, update the test!" assert len(m.labels) == 0, "m is not expected to have labels, update the test!" diff --git a/tests/e2e/distributed_queries/order_by_and_limit.py b/tests/e2e/distributed_queries/order_by_and_limit.py index 05297f8f6..29be77857 100644 --- a/tests/e2e/distributed_queries/order_by_and_limit.py +++ b/tests/e2e/distributed_queries/order_by_and_limit.py @@ -9,11 +9,13 @@ # by the Apache License, Version 2.0, included in the file # licenses/APL.txt. -import typing -import mgclient import sys -import pytest import time +import typing + +import mgclient +import pytest + from common import * @@ -21,23 +23,23 @@ def test_order_by_and_limit(connection): wait_for_shard_manager_to_initialize() cursor = connection.cursor() - assert has_n_result_row(cursor, "CREATE (n :label {property:1})", 0) - assert has_n_result_row(cursor, "CREATE (n :label {property:2})", 0) - assert has_n_result_row(cursor, "CREATE (n :label {property:3})", 0) - assert has_n_result_row(cursor, "CREATE (n :label {property:4})", 0) - - results = execute_and_fetch_all(cursor, "MATCH (n) RETURN n ORDER BY n.property DESC") - assert len(results) == 4 - i = 4 - for n in results: - n_props = n[0].properties - assert len(n_props) == 1 - assert n_props["property"] == i + results = execute_and_fetch_all( + cursor, + "UNWIND [{property:1}, {property:3}, {property:2}] AS map RETURN map ORDER BY map.property DESC", + ) + assert len(results) == 3 + i = 3 + for map in results: + assert len(map) == 1 + assert map[0]["property"] == i i = i - 1 - result = execute_and_fetch_all(cursor, "MATCH (n) RETURN n ORDER BY n.property LIMIT 1") + result = execute_and_fetch_all( + cursor, + "UNWIND [{property:1}, {property:3}, {property:2}] AS map RETURN map ORDER BY map.property LIMIT 1", + ) assert len(result) == 1 - assert result[0][0].properties["property"] == 1 + assert result[0][0]["property"] == 1 if __name__ == "__main__": diff --git a/tests/manual/CMakeLists.txt b/tests/manual/CMakeLists.txt index ea680105c..800cd0515 100644 --- a/tests/manual/CMakeLists.txt +++ b/tests/manual/CMakeLists.txt @@ -9,6 +9,7 @@ function(add_manual_test test_cpp) get_filename_component(exec_name ${test_cpp} NAME_WE) set(target_name ${test_prefix}${exec_name}) add_executable(${target_name} ${test_cpp} ${ARGN}) + # OUTPUT_NAME sets the real name of a target when it is built and can be # used to help create two targets of the same name even though CMake # requires unique logical target names @@ -21,7 +22,7 @@ target_link_libraries(${test_prefix}antlr_parser antlr_opencypher_parser_lib) add_manual_test(antlr_sigsegv.cpp) target_link_libraries(${test_prefix}antlr_sigsegv gtest gtest_main - antlr_opencypher_parser_lib mg-utils) + antlr_opencypher_parser_lib mg-utils) add_manual_test(antlr_tree_pretty_print.cpp) target_link_libraries(${test_prefix}antlr_tree_pretty_print antlr_opencypher_parser_lib) @@ -37,13 +38,15 @@ target_link_libraries(${test_prefix}query_hash mg-query) add_manual_test(query_planner.cpp interactive/planning.cpp) target_link_libraries(${test_prefix}query_planner mg-query) -if (READLINE_FOUND) + +if(READLINE_FOUND) target_link_libraries(${test_prefix}query_planner readline) endif() add_manual_test(query_execution_dummy.cpp) target_link_libraries(${test_prefix}query_execution_dummy mg-query) -if (READLINE_FOUND) + +if(READLINE_FOUND) target_link_libraries(${test_prefix}query_execution_dummy readline) endif() @@ -61,3 +64,6 @@ target_link_libraries(${test_prefix}ssl_client mg-communication) add_manual_test(ssl_server.cpp) target_link_libraries(${test_prefix}ssl_server mg-communication) + +add_manual_test(query_performance.cpp) +target_link_libraries(${test_prefix}query_performance mg-communication mg-utils mg-io mg-io-simulator mg-coordinator mg-query-v2 mg-storage-v3 mg-query mg-storage-v2) diff --git a/tests/manual/query_performance.cpp b/tests/manual/query_performance.cpp new file mode 100644 index 000000000..8c7877b0d --- /dev/null +++ b/tests/manual/query_performance.cpp @@ -0,0 +1,352 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +// This binary is meant to easily compare the performance of: +// - Memgraph v2 +// - Memgraph v3 +// - Memgraph v3 with MultiFrame +// This binary measures three things which provides a high level and easily understandable metric about the performance +// difference between the different versions: +// 1. Read time: how much time does it take to read the files: +// 2. Init time: how much time does it take to run the init queries, including the index creation. For details please +// check RunV2. +// 3. Benchmark time: how much time does it take to run the benchmark queries. +// To quickly compare performance of the different versions just change the query or queries in the benchmark queries +// file you can see the different by running this executable. This way we don't have keep multiple binaries of Memgraph +// v2 and Memgraph v3 with/without MultiFrame, start Memgraph and connect to it with mgconsole and other hassles. As +// everything is run in this binary, it makes easier to generate perf reports/flamegraphs from the query execution of +// different Memgraph versions compared to using the full blown version of Memgraph. +// +// A few important notes: +// - All the input files are mandated to have an empty line at the end of the file as the reading logic expect that. +// - tests/mgbench/dataset_creator_unwind.py is recommended to generate the dataset because it generates queries with +// UNWIND that makes the import faster in Memgraph v3, thus we can compare the performance on non trivial datasets +// also. To make it possible to use the generated dataset, you have to move the generated index queries into a +// separate file that can be supplied as index queries file for this binary when using Memgraph v2. The reason for +// this is Memgraph v3 cannot handle indices yet, thus it crashes. +// - Check the command line flags and their description defined in this file. +// - Also check out the --default-multi-frame-size command line flag if you want to play with that. +// - The log level is manually set to warning in the main function to avoid the overwhelming log messages from Memgraph +// v3. Apart from ease of use, the huge amount of looging can degrade the actual performance. +// +// Example usage with Memgraph v2: +// ./query_performance +// --index-queries-file indices.cypher +// --init-queries-file dataset.cypher +// --benchmark-queries-files expand.cypher,match.cypyher +// --use-v3=false +// +// Example usage with Memgraph v3 without MultiFrame: +// ./query_performance +// --split-file split_file +// --init-queries-file dataset.cypher +// --benchmark-queries-files expand.cypher,match.cypyher +// --use-v3=true +// --use-multi-frame=false +// +// Example usage with Memgraph v3 with MultiFrame: +// ./query_performance +// --split-file split_file +// --init-queries-file dataset.cypher +// --benchmark-queries-files expand.cypher,match.cypyher +// --use-v3=true +// --use-multi-frame=true +// +// The examples are using only the necessary flags, however specifying all of them is not a problem, so if you specify +// --index-queries-file for Memgraph v3, then it will be safely ignored just as --split-file for Memgraph v2. +// +// To generate flamegraph you can use the following command: +// flamegraph --cmd "record -F 997 --call-graph fp -g" --root -o flamegraph.svg -- ./query_performance +// Using the default option (dwarf) for --call-graph when calling perf might result in too long runtine of flamegraph +// because of address resolution. See https://github.com/flamegraph-rs/flamegraph/issues/74. + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +// v3 includes +#include "io/address.hpp" +#include "io/local_transport/local_system.hpp" +#include "io/message_histogram_collector.hpp" +#include "machine_manager/machine_manager.hpp" +#include "query/discard_value_stream.hpp" +#include "query/v2/discard_value_stream.hpp" +#include "query/v2/interpreter.hpp" +#include "query/v2/request_router.hpp" + +// v2 includes +#include "query/interpreter.hpp" +#include "storage/v2/storage.hpp" + +// common includes +#include "utils/string.hpp" + +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_string(index_queries_file, "", + "Path to the file which contains the queries to create indices. Used only for v2. Must contain an empty " + "line at the end of the file after the queries."); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_string(split_file, "", + "Path to the split file which contains the predefined labels, properties, edge types and shard-ranges. " + "Used only for v3. Must contain an empty line at the end of the file."); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_string(init_queries_file, "", + "Path to the file that is used to insert the initial dataset, one query per line. Must contain an empty " + "line at the end of the file after the queries."); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_string(benchmark_queries_files, "", + "Comma separated paths to the files that contain the queries that we want to compare, one query per " + "line. Must contain an empty line at the end of each file after the queries."); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_bool(use_v3, true, "If set to true, then Memgraph v3 will be used, otherwise Memgraph v2 will be used."); + +DEFINE_string(export_json_results, "", "If not empty, then the results will be exported as a json file."); + +DEFINE_string(data_directory, "mg_data", "Path to directory to use as storage directory for Memgraph v2."); + +namespace memgraph::tests::manual { + +template +struct DependantTypes {}; + +template <> +struct DependantTypes { + using Interpreter = query::Interpreter; + using DiscardValueResultStream = query::DiscardValueResultStream; +}; + +template <> +struct DependantTypes { + using Interpreter = query::v2::Interpreter; + using DiscardValueResultStream = query::v2::DiscardValueResultStream; +}; + +template +void PutResult(nlohmann::json &json, const std::string_view name, std::chrono::duration duration) { + json[name] = std::chrono::duration_cast(duration).count(); +} + +template +using Interpreter = typename DependantTypes::Interpreter; + +template +using DiscardValueResultStream = typename DependantTypes::DiscardValueResultStream; + +template +void RunQueries(TInterpreterContext &interpreter_context, const std::vector &queries) { + Interpreter interpreter{&interpreter_context}; + DiscardValueResultStream stream; + + for (const auto &query : queries) { + auto result = interpreter.Prepare(query, {}, nullptr); + interpreter.Pull(&stream, std::nullopt, result.qid); + } +} + +template +void RunInitQueries(TInterpreterContext &interpreter_context, const std::vector &init_queries) { + RunQueries(interpreter_context, init_queries); +} + +template +void RunBenchmarkQueries(TInterpreterContext &interpreter_context, const std::vector &benchmark_queries) { + RunQueries(interpreter_context, benchmark_queries); +} + +std::vector ReadQueries(const std::string &file_name) { + std::vector queries{}; + std::string buffer; + + std::ifstream file{file_name, std::ios::in}; + MG_ASSERT(file.good(), "Cannot open queries file to read: {}", file_name); + while (file.good()) { + std::getline(file, buffer); + if (buffer.empty()) { + continue; + } + // Trim the trailing `;` + queries.push_back(buffer.substr(0, buffer.size() - 1)); + } + return queries; +} + +std::map> ReadBenchmarkQueries(const std::string benchmark_queries_files) { + auto benchmark_files = utils::Split(benchmark_queries_files, ","); + std::map> result; + for (const auto &benchmark_file : benchmark_files) { + const auto path = std::filesystem::path(benchmark_file); + result.emplace(path.stem().string(), ReadQueries(benchmark_file)); + } + return result; +} + +void RunV2() { + spdlog::critical("Running V2"); + const auto run_start = std::chrono::high_resolution_clock::now(); + + const auto index_queries = ReadQueries(FLAGS_index_queries_file); + const auto init_queries = ReadQueries(FLAGS_init_queries_file); + const auto benchmarks = ReadBenchmarkQueries(FLAGS_benchmark_queries_files); + + storage::Storage storage{ + storage::Config{.durability{.storage_directory = FLAGS_data_directory, + .snapshot_wal_mode = storage::Config::Durability::SnapshotWalMode::DISABLED}}}; + + memgraph::query::InterpreterContext interpreter_context{ + &storage, + {.query = {.allow_load_csv = false}, + .execution_timeout_sec = 0, + .replication_replica_check_frequency = std::chrono::seconds(0), + .default_kafka_bootstrap_servers = "", + .default_pulsar_service_url = "", + .stream_transaction_conflict_retries = 0, + .stream_transaction_retry_interval = std::chrono::milliseconds(0)}, + FLAGS_data_directory}; + + const auto init_start = std::chrono::high_resolution_clock::now(); + RunInitQueries(interpreter_context, index_queries); + RunInitQueries(interpreter_context, init_queries); + const auto benchmark_start = std::chrono::high_resolution_clock::now(); + + spdlog::critical("Read: {}ms", std::chrono::duration_cast(init_start - run_start).count()); + spdlog::critical("Init: {}ms", + std::chrono::duration_cast(benchmark_start - init_start).count()); + + std::map benchmark_results; + for (const auto &[name, queries] : benchmarks) { + const auto current_start = std::chrono::high_resolution_clock::now(); + RunBenchmarkQueries(interpreter_context, queries); + const auto current_stop = std::chrono::high_resolution_clock::now(); + const auto elapsed = current_stop - current_start; + spdlog::critical("Benchmark {}: {}ms", name, + std::chrono::duration_cast(elapsed).count()); + benchmark_results.emplace(name, elapsed); + } + + const auto benchmark_end = std::chrono::high_resolution_clock::now(); + spdlog::critical("Benchmark: {}ms", + std::chrono::duration_cast(benchmark_end - benchmark_start).count()); + + if (!FLAGS_export_json_results.empty()) { + nlohmann::json results; + PutResult(results, "init", benchmark_start - init_start); + nlohmann::json benchmark_results_json; + for (const auto &[name, duration] : benchmark_results) { + PutResult(benchmark_results_json, name, duration); + } + results["benchmarks"] = std::move(benchmark_results_json); + std::ofstream results_file{FLAGS_export_json_results}; + results_file << results.dump(); + } +} + +void RunV3() { + spdlog::critical("Running V3"); + const auto run_start = std::chrono::high_resolution_clock::now(); + std::ifstream sm_file{FLAGS_split_file, std::ios::in}; + MG_ASSERT(sm_file.good(), "Cannot open split file to read: {}", FLAGS_split_file); + auto sm = memgraph::coordinator::ShardMap::Parse(sm_file); + + const auto init_queries = ReadQueries(FLAGS_init_queries_file); + const auto benchmarks = ReadBenchmarkQueries(FLAGS_benchmark_queries_files); + + io::local_transport::LocalSystem ls; + + auto unique_local_addr_query = io::Address::UniqueLocalAddress(); + auto io = ls.Register(unique_local_addr_query); + + memgraph::machine_manager::MachineConfig config{ + .coordinator_addresses = std::vector{unique_local_addr_query}, + .is_storage = true, + .is_coordinator = true, + .listen_ip = unique_local_addr_query.last_known_ip, + .listen_port = unique_local_addr_query.last_known_port, + .shard_worker_threads = 2, + }; + + memgraph::coordinator::Coordinator coordinator{sm}; + + memgraph::machine_manager::MachineManager mm{io, config, coordinator}; + std::jthread mm_thread([&mm] { mm.Run(); }); + + auto rr_factory = std::make_unique(io); + + query::v2::InterpreterContext interpreter_context{(memgraph::storage::v3::Shard *)(nullptr), + {.execution_timeout_sec = 0}, + "data", + std::move(rr_factory), + mm.CoordinatorAddress()}; + + // without this it fails sometimes because the CreateVertices request might reach the shard worker faster than the + // ShardToInitialize + std::this_thread::sleep_for(std::chrono::milliseconds(150)); + + const auto init_start = std::chrono::high_resolution_clock::now(); + RunInitQueries(interpreter_context, init_queries); + const auto benchmark_start = std::chrono::high_resolution_clock::now(); + + spdlog::critical("Read: {}ms", std::chrono::duration_cast(init_start - run_start).count()); + spdlog::critical("Init: {}ms", + std::chrono::duration_cast(benchmark_start - init_start).count()); + + std::map benchmark_results; + for (const auto &[name, queries] : benchmarks) { + const auto current_start = std::chrono::high_resolution_clock::now(); + RunBenchmarkQueries(interpreter_context, queries); + const auto current_stop = std::chrono::high_resolution_clock::now(); + const auto elapsed = current_stop - current_start; + spdlog::critical("Benchmark {}: {}ms", name, + std::chrono::duration_cast(elapsed).count()); + benchmark_results.emplace(name, elapsed); + } + + const auto benchmark_end = std::chrono::high_resolution_clock::now(); + spdlog::critical("Benchmark: {}ms", + std::chrono::duration_cast(benchmark_end - benchmark_start).count()); + + ls.ShutDown(); + auto latency_histograms = nlohmann::json::parse(fmt::format("{}", io.ResponseLatencies())); + spdlog::warn(latency_histograms.dump(4)); + + if (!FLAGS_export_json_results.empty()) { + nlohmann::json results; + PutResult(results, "init", benchmark_start - init_start); + nlohmann::json benchmark_results_json; + for (const auto &[name, duration] : benchmark_results) { + PutResult(benchmark_results_json, name, duration); + } + results["benchmarks"] = std::move(benchmark_results_json); + results["latencies"] = std::move(latency_histograms); + std::ofstream results_file{FLAGS_export_json_results}; + results_file << results.dump(); + } +} +} // namespace memgraph::tests::manual + +int main(int argc, char **argv) { + spdlog::set_level(spdlog::level::warn); + spdlog::cfg::load_env_levels(); + gflags::ParseCommandLineFlags(&argc, &argv, true); + if (FLAGS_use_v3) { + memgraph::tests::manual::RunV3(); + } else { + memgraph::tests::manual::RunV2(); + } + return 0; +} diff --git a/tests/manual/query_performance_runner.py b/tests/manual/query_performance_runner.py new file mode 100755 index 000000000..fa8b1b4b3 --- /dev/null +++ b/tests/manual/query_performance_runner.py @@ -0,0 +1,116 @@ +#!/usr/bin/env python3 + +# Copyright 2023 Memgraph Ltd. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +# License, and you may not use this file except in compliance with the Business Source License. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0, included in the file +# licenses/APL.txt. + +import argparse +import io +import json +import os +import subprocess +import tarfile +import tempfile + +import requests + +SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) +PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..")) +BUILD_DIR = os.path.join(PROJECT_DIR, "build") +BINARY_DIR = os.path.join(BUILD_DIR, "tests/manual") +DEFAULT_BENCHMARK_DIR = os.path.join(BINARY_DIR, "query_performance_benchmark") +DATA_URL = ( + "https://s3.eu-west-1.amazonaws.com/deps.memgraph.io/dataset/query_performance/query_performance_benchmark.tar.gz" +) + +parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) +parser.add_argument( + "--binary", + type=str, + default=os.path.join(BINARY_DIR, "query_performance"), + help="Path to the binary to use for the benchmark.", +) +parser.add_argument( + "--data-dir", + type=str, + default=tempfile.TemporaryDirectory().name, + help="Path to directory that can be used as a data directory for ", +) +parser.add_argument( + "--summary-path", + type=str, + default=os.path.join(DEFAULT_BENCHMARK_DIR, "summary.json"), + help="Path to which file write the summary.", +) + +parser.add_argument("--init-queries-file", type=str, default=os.path.join(DEFAULT_BENCHMARK_DIR, "dataset.cypher")) +parser.add_argument("--index-queries-file", type=str, default=os.path.join(DEFAULT_BENCHMARK_DIR, "indices.cypher")) +parser.add_argument("--split-file", type=str, default=os.path.join(DEFAULT_BENCHMARK_DIR, "split_file")) + +parser.add_argument( + "--benchmark-queries-files", + type=str, + default=",".join( + [os.path.join(DEFAULT_BENCHMARK_DIR, file_name) for file_name in ["expand.cypher", "match_files.cypher"]] + ), +) + +args = parser.parse_args() + +v2_results_path = os.path.join(DEFAULT_BENCHMARK_DIR, "v2_results.json") +v3_results_path = os.path.join(DEFAULT_BENCHMARK_DIR, "v3_results.json") + + +if os.path.exists(DEFAULT_BENCHMARK_DIR): + print(f"Using cachced data from {DEFAULT_BENCHMARK_DIR}") +else: + print(f"Downloading benchmark data to {DEFAULT_BENCHMARK_DIR}") + r = requests.get(DATA_URL) + assert r.ok, "Cannot download data" + file_like_object = io.BytesIO(r.content) + tar = tarfile.open(fileobj=file_like_object) + tar.extractall(os.path.dirname(DEFAULT_BENCHMARK_DIR)) + +subprocess.run( + [ + args.binary, + f"--split-file={args.split_file}", + f"--index-queries-file={args.index_queries_file}", + f"--init-queries-file={args.init_queries_file}", + f"--benchmark-queries-files={args.benchmark_queries_files}", + "--use-v3=false", + "--use-multi-frame=true", + f"--export-json-results={v2_results_path}", + f"--data-directory={args.data_dir}", + ] +) + +subprocess.run( + [ + args.binary, + f"--split-file={args.split_file}", + f"--index-queries-file={args.index_queries_file}", + f"--init-queries-file={args.init_queries_file}", + f"--benchmark-queries-files={args.benchmark_queries_files}", + "--use-v3=true", + "--use-multi-frame=true", + f"--export-json-results={v3_results_path}", + f"--data-directory={args.data_dir}", + ] +) + + +v2_results_file = open(v2_results_path) +v2_results = json.load(v2_results_file) +v3_results_file = open(v3_results_path) +v3_results = json.load(v3_results_file) + +with open(args.summary_path, "w") as summary: + json.dump({"v2": v2_results, "v3": v3_results}, summary) diff --git a/tests/mgbench/dataset_creator.py b/tests/mgbench/dataset_creator.py index 9ebeb8cd1..0b73eceed 100644 --- a/tests/mgbench/dataset_creator.py +++ b/tests/mgbench/dataset_creator.py @@ -51,10 +51,22 @@ import helpers def main(): parser = argparse.ArgumentParser() - parser.add_argument("--number_of_identities", type=int, default=10) - parser.add_argument("--number_of_files", type=int, default=10) - parser.add_argument("--percentage_of_permissions", type=float, default=1.0) - parser.add_argument("--filename", default="dataset.cypher") + parser.add_argument( + "--number_of_identities", + type=int, + default=10, + help="Determines how many :Identity nodes will the dataset contain.", + ) + parser.add_argument( + "--number_of_files", type=int, default=10, help="Determines how many :File nodes will the dataset contain." + ) + parser.add_argument( + "--percentage_of_permissions", + type=float, + default=1.0, + help="Determines approximately what percentage of the all possible identity-permission-file connections will be created.", + ) + parser.add_argument("--filename", default="dataset.cypher", help="The name of the output file.") args = parser.parse_args() diff --git a/tests/mgbench/dataset_creator_unwind.py b/tests/mgbench/dataset_creator_unwind.py index 564a4d018..00de1c7bf 100644 --- a/tests/mgbench/dataset_creator_unwind.py +++ b/tests/mgbench/dataset_creator_unwind.py @@ -51,10 +51,22 @@ import helpers def main(): parser = argparse.ArgumentParser() - parser.add_argument("--number_of_identities", type=int, default=10) - parser.add_argument("--number_of_files", type=int, default=10) - parser.add_argument("--percentage_of_permissions", type=float, default=1.0) - parser.add_argument("--filename", default="dataset.cypher") + parser.add_argument( + "--number_of_identities", + type=int, + default=10, + help="Determines how many :Identity nodes will the dataset contain.", + ) + parser.add_argument( + "--number_of_files", type=int, default=10, help="Determines how many :File nodes will the dataset contain." + ) + parser.add_argument( + "--percentage_of_permissions", + type=float, + default=1.0, + help="Determines approximately what percentage of the all possible identity-permission-file connections will be created.", + ) + parser.add_argument("--filename", default="dataset.cypher", help="The name of the output file.") args = parser.parse_args() diff --git a/tests/simulation/basic_request.cpp b/tests/simulation/basic_request.cpp index 868f4ac10..d6504a365 100644 --- a/tests/simulation/basic_request.cpp +++ b/tests/simulation/basic_request.cpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -55,7 +55,7 @@ void run_server(Io io) { highest_seen = std::max(highest_seen, req.proposal); auto srv_res = CounterResponse{highest_seen}; - io.Send(request_envelope.from_address, request_envelope.request_id, srv_res); + io.Send(request_envelope.from_address, request_envelope.request_id, std::move(srv_res)); } } @@ -76,7 +76,7 @@ std::pair RunWorkload(SimulatorConfig CounterRequest cli_req; cli_req.proposal = i; spdlog::info("[CLIENT] calling Request"); - auto res_f = cli_io.Request(srv_addr, cli_req); + auto res_f = cli_io.Request(srv_addr, std::move(cli_req)); spdlog::info("[CLIENT] calling Wait"); auto res_rez = std::move(res_f).Wait(); spdlog::info("[CLIENT] Wait returned"); diff --git a/tests/simulation/trial_query_storage/query_storage_test.cpp b/tests/simulation/trial_query_storage/query_storage_test.cpp index 8ef12bdb8..9e778f59d 100644 --- a/tests/simulation/trial_query_storage/query_storage_test.cpp +++ b/tests/simulation/trial_query_storage/query_storage_test.cpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -44,7 +44,7 @@ void run_server(Io io) { for (auto index = start_index; index < start_index + req.count; ++index) { response.vertices.push_back({std::string("Vertex_") + std::to_string(index)}); } - io.Send(request_envelope.from_address, request_envelope.request_id, response); + io.Send(request_envelope.from_address, request_envelope.request_id, std::move(response)); } } @@ -78,7 +78,7 @@ int main() { auto req = ScanVerticesRequest{2, std::nullopt}; - auto res_f = cli_io.Request(srv_addr, req); + auto res_f = cli_io.Request(srv_addr, std::move(req)); auto res_rez = std::move(res_f).Wait(); simulator.ShutDown(); return 0; diff --git a/tests/unit/local_transport.cpp b/tests/unit/local_transport.cpp index aa03325de..f2d3f8458 100644 --- a/tests/unit/local_transport.cpp +++ b/tests/unit/local_transport.cpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -48,7 +48,7 @@ void RunServer(Io io) { highest_seen = std::max(highest_seen, req.proposal); auto srv_res = CounterResponse{highest_seen}; - io.Send(request_envelope.from_address, request_envelope.request_id, srv_res); + io.Send(request_envelope.from_address, request_envelope.request_id, std::move(srv_res)); } } @@ -70,7 +70,7 @@ TEST(LocalTransport, BasicRequest) { auto value = 1; // i; cli_req.proposal = value; spdlog::info("[CLIENT] sending request"); - auto res_f = cli_io.Request(srv_addr, cli_req); + auto res_f = cli_io.Request(srv_addr, std::move(cli_req)); spdlog::info("[CLIENT] waiting on future"); auto res_rez = std::move(res_f).Wait();