diff --git a/src/expr/interpret/frame.hpp b/src/expr/interpret/frame.hpp index 72bfcf245..457806680 100644 --- a/src/expr/interpret/frame.hpp +++ b/src/expr/interpret/frame.hpp @@ -34,6 +34,7 @@ class Frame { const TypedValue &at(const Symbol &symbol) const { return elems_.at(symbol.position()); } auto &elems() { return elems_; } + const auto &elems() const { return elems_; } utils::MemoryResource *GetMemoryResource() const { return elems_.get_allocator().GetMemoryResource(); } diff --git a/src/io/local_transport/local_system.hpp b/src/io/local_transport/local_system.hpp index 2e54f8d75..7b0cda537 100644 --- a/src/io/local_transport/local_system.hpp +++ b/src/io/local_transport/local_system.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source diff --git a/src/io/simulator/simulator.hpp b/src/io/simulator/simulator.hpp index 8afc073af..4ff8e7ae0 100644 --- a/src/io/simulator/simulator.hpp +++ b/src/io/simulator/simulator.hpp @@ -41,7 +41,7 @@ class Simulator { Io<SimulatorTransport> Register(Address address) { std::uniform_int_distribution<uint64_t> seed_distrib; uint64_t seed = seed_distrib(rng_); - return Io{SimulatorTransport{simulator_handle_, address, seed}, address}; + return Io{SimulatorTransport(simulator_handle_, address, seed), address}; } void IncrementServerCountAndWaitForQuiescentState(Address address) { @@ -50,8 +50,12 @@ class Simulator { SimulatorStats Stats() { return simulator_handle_->Stats(); } + std::shared_ptr<SimulatorHandle> GetSimulatorHandle() const { return simulator_handle_; } + std::function<bool()> GetSimulatorTickClosure() { - std::function<bool()> tick_closure = [handle_copy = simulator_handle_] { return handle_copy->MaybeTickSimulator(); }; + std::function<bool()> tick_closure = [handle_copy = simulator_handle_] { + return handle_copy->MaybeTickSimulator(); + }; return tick_closure; } }; diff --git a/src/io/simulator/simulator_transport.hpp b/src/io/simulator/simulator_transport.hpp index 038cfeb03..1272a04a1 100644 --- a/src/io/simulator/simulator_transport.hpp +++ b/src/io/simulator/simulator_transport.hpp @@ -26,7 +26,7 @@ using memgraph::io::Time; class SimulatorTransport { std::shared_ptr<SimulatorHandle> simulator_handle_; - const Address address_; + Address address_; std::mt19937 rng_; public: @@ -36,7 +36,9 @@ class SimulatorTransport { template <Message RequestT, Message ResponseT> ResponseFuture<ResponseT> Request(Address to_address, Address from_address, RequestT request, std::function<void()> notification, Duration timeout) { - std::function<bool()> tick_simulator = [handle_copy = simulator_handle_] { return handle_copy->MaybeTickSimulator(); }; + std::function<bool()> tick_simulator = [handle_copy = simulator_handle_] { + return handle_copy->MaybeTickSimulator(); + }; return simulator_handle_->template SubmitRequest<RequestT, ResponseT>( to_address, from_address, std::move(request), timeout, std::move(tick_simulator), std::move(notification)); diff --git a/src/memgraph.cpp b/src/memgraph.cpp index d825cc0e7..35fd20ad7 100644 --- a/src/memgraph.cpp +++ b/src/memgraph.cpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -640,6 +640,8 @@ int main(int argc, char **argv) { memgraph::machine_manager::MachineManager<memgraph::io::local_transport::LocalTransport> mm{io, config, coordinator}; std::jthread mm_thread([&mm] { mm.Run(); }); + auto rr_factory = std::make_unique<memgraph::query::v2::LocalRequestRouterFactory>(io); + memgraph::query::v2::InterpreterContext interpreter_context{ (memgraph::storage::v3::Shard *)(nullptr), {.query = {.allow_load_csv = FLAGS_allow_load_csv}, @@ -650,7 +652,7 @@ int main(int argc, char **argv) { .stream_transaction_conflict_retries = FLAGS_stream_transaction_conflict_retries, .stream_transaction_retry_interval = std::chrono::milliseconds(FLAGS_stream_transaction_retry_interval)}, FLAGS_data_directory, - std::move(io), + std::move(rr_factory), mm.CoordinatorAddress()}; SessionData session_data{&interpreter_context}; diff --git a/src/query/v2/interpreter.cpp b/src/query/v2/interpreter.cpp index 594942aec..5fcd433a1 100644 --- a/src/query/v2/interpreter.cpp +++ b/src/query/v2/interpreter.cpp @@ -64,7 +64,11 @@ #include "utils/tsc.hpp" #include "utils/variant_helpers.hpp" +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_bool(use_multi_frame, false, "Whether to use MultiFrame or not"); + namespace EventCounter { + extern Event ReadQuery; extern Event WriteQuery; extern Event ReadWriteQuery; @@ -74,6 +78,7 @@ extern const Event LabelPropertyIndexCreated; extern const Event StreamsCreated; extern const Event TriggersCreated; + } // namespace EventCounter namespace memgraph::query::v2 { @@ -688,7 +693,7 @@ PullPlan::PullPlan(const std::shared_ptr<CachedPlan> plan, const Parameters &par : plan_(plan), cursor_(plan->plan().MakeCursor(execution_memory)), frame_(plan->symbol_table().max_position(), execution_memory), - multi_frame_(plan->symbol_table().max_position(), kNumberOfFramesInMultiframe, execution_memory), + multi_frame_(plan->symbol_table().max_position(), FLAGS_default_multi_frame_size, execution_memory), memory_limit_(memory_limit) { ctx_.db_accessor = dba; ctx_.symbol_table = plan->symbol_table(); @@ -704,7 +709,6 @@ PullPlan::PullPlan(const std::shared_ptr<CachedPlan> plan, const Parameters &par ctx_.request_router = request_router; ctx_.edge_ids_alloc = &interpreter_context->edge_ids_alloc; } - std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::PullMultiple(AnyStream *stream, std::optional<int> n, const std::vector<Symbol> &output_symbols, std::map<std::string, TypedValue> *summary) { @@ -732,10 +736,7 @@ std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::PullMultiple(AnyStrea } // Returns true if a result was pulled. - const auto pull_result = [&]() -> bool { - cursor_->PullMultiple(multi_frame_, ctx_); - return !multi_frame_.HasInvalidFrame(); - }; + const auto pull_result = [&]() -> bool { return cursor_->PullMultiple(multi_frame_, ctx_); }; const auto stream_values = [&output_symbols, &stream](const Frame &frame) { // TODO: The streamed values should also probably use the above memory. @@ -755,13 +756,14 @@ std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::PullMultiple(AnyStrea int i = 0; if (has_unsent_results_ && !output_symbols.empty()) { // stream unsent results from previous pull - - auto iterator_for_valid_frame_only = multi_frame_.GetValidFramesReader(); - for (const auto &frame : iterator_for_valid_frame_only) { + for (auto &frame : multi_frame_.GetValidFramesConsumer()) { stream_values(frame); + frame.MakeInvalid(); ++i; + if (i == n) { + break; + } } - multi_frame_.MakeAllFramesInvalid(); } for (; !n || i < n;) { @@ -770,13 +772,17 @@ std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::PullMultiple(AnyStrea } if (!output_symbols.empty()) { - auto iterator_for_valid_frame_only = multi_frame_.GetValidFramesReader(); - for (const auto &frame : iterator_for_valid_frame_only) { + for (auto &frame : multi_frame_.GetValidFramesConsumer()) { stream_values(frame); + frame.MakeInvalid(); ++i; + if (i == n) { + break; + } } + } else { + multi_frame_.MakeAllFramesInvalid(); } - multi_frame_.MakeAllFramesInvalid(); } // If we finished because we streamed the requested n results, @@ -811,8 +817,7 @@ std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::PullMultiple(AnyStrea std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::Pull(AnyStream *stream, std::optional<int> n, const std::vector<Symbol> &output_symbols, std::map<std::string, TypedValue> *summary) { - auto should_pull_multiple = false; // TODO on the long term, we will only use PullMultiple - if (should_pull_multiple) { + if (FLAGS_use_multi_frame) { return PullMultiple(stream, n, output_symbols, summary); } // Set up temporary memory for a single Pull. Initial memory comes from the @@ -906,34 +911,24 @@ using RWType = plan::ReadWriteTypeChecker::RWType; InterpreterContext::InterpreterContext(storage::v3::Shard *db, const InterpreterConfig config, const std::filesystem::path & /*data_directory*/, - io::Io<io::local_transport::LocalTransport> io, + std::unique_ptr<RequestRouterFactory> request_router_factory, coordinator::Address coordinator_addr) - : db(db), config(config), io{std::move(io)}, coordinator_address{coordinator_addr} {} + : db(db), + config(config), + coordinator_address{coordinator_addr}, + request_router_factory_{std::move(request_router_factory)} {} Interpreter::Interpreter(InterpreterContext *interpreter_context) : interpreter_context_(interpreter_context) { MG_ASSERT(interpreter_context_, "Interpreter context must not be NULL"); - // TODO(tyler) make this deterministic so that it can be tested. - auto random_uuid = boost::uuids::uuid{boost::uuids::random_generator()()}; - auto query_io = interpreter_context_->io.ForkLocal(random_uuid); + request_router_ = + interpreter_context_->request_router_factory_->CreateRequestRouter(interpreter_context_->coordinator_address); - request_router_ = std::make_unique<RequestRouter<io::local_transport::LocalTransport>>( - coordinator::CoordinatorClient<io::local_transport::LocalTransport>( - query_io, interpreter_context_->coordinator_address, std::vector{interpreter_context_->coordinator_address}), - std::move(query_io)); // Get edge ids - coordinator::CoordinatorWriteRequests requests{coordinator::AllocateEdgeIdBatchRequest{.batch_size = 1000000}}; - io::rsm::WriteRequest<coordinator::CoordinatorWriteRequests> ww; - ww.operation = requests; - auto resp = interpreter_context_->io - .Request<io::rsm::WriteRequest<coordinator::CoordinatorWriteRequests>, - io::rsm::WriteResponse<coordinator::CoordinatorWriteResponses>>( - interpreter_context_->coordinator_address, ww) - .Wait(); - if (resp.HasValue()) { - const auto alloc_edge_id_reps = - std::get<coordinator::AllocateEdgeIdBatchResponse>(resp.GetValue().message.write_return); - interpreter_context_->edge_ids_alloc = {alloc_edge_id_reps.low, alloc_edge_id_reps.high}; + const auto edge_ids_alloc_min_max_pair = + request_router_->AllocateInitialEdgeIds(interpreter_context_->coordinator_address); + if (edge_ids_alloc_min_max_pair) { + interpreter_context_->edge_ids_alloc = {edge_ids_alloc_min_max_pair->first, edge_ids_alloc_min_max_pair->second}; } } diff --git a/src/query/v2/interpreter.hpp b/src/query/v2/interpreter.hpp index 985c9a90c..4efc85c22 100644 --- a/src/query/v2/interpreter.hpp +++ b/src/query/v2/interpreter.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -16,7 +16,6 @@ #include "coordinator/coordinator.hpp" #include "coordinator/coordinator_client.hpp" -#include "io/local_transport/local_transport.hpp" #include "io/transport.hpp" #include "query/v2/auth_checker.hpp" #include "query/v2/bindings/cypher_main_visitor.hpp" @@ -172,7 +171,8 @@ struct PreparedQuery { struct InterpreterContext { explicit InterpreterContext(storage::v3::Shard *db, InterpreterConfig config, const std::filesystem::path &data_directory, - io::Io<io::local_transport::LocalTransport> io, coordinator::Address coordinator_addr); + std::unique_ptr<RequestRouterFactory> request_router_factory, + coordinator::Address coordinator_addr); storage::v3::Shard *db; @@ -188,26 +188,24 @@ struct InterpreterContext { const InterpreterConfig config; IdAllocator edge_ids_alloc; - // TODO (antaljanosbenjamin) Figure out an abstraction for io::Io to make it possible to construct an interpreter - // context with a simulator transport without templatizing it. - io::Io<io::local_transport::LocalTransport> io; coordinator::Address coordinator_address; + std::unique_ptr<RequestRouterFactory> request_router_factory_; storage::v3::LabelId NameToLabelId(std::string_view label_name) { - return storage::v3::LabelId::FromUint(query_id_mapper.NameToId(label_name)); + return storage::v3::LabelId::FromUint(query_id_mapper_.NameToId(label_name)); } storage::v3::PropertyId NameToPropertyId(std::string_view property_name) { - return storage::v3::PropertyId::FromUint(query_id_mapper.NameToId(property_name)); + return storage::v3::PropertyId::FromUint(query_id_mapper_.NameToId(property_name)); } storage::v3::EdgeTypeId NameToEdgeTypeId(std::string_view edge_type_name) { - return storage::v3::EdgeTypeId::FromUint(query_id_mapper.NameToId(edge_type_name)); + return storage::v3::EdgeTypeId::FromUint(query_id_mapper_.NameToId(edge_type_name)); } private: // TODO Replace with local map of labels, properties and edge type ids - storage::v3::NameIdMapper query_id_mapper; + storage::v3::NameIdMapper query_id_mapper_; }; /// Function that is used to tell all active interpreters that they should stop @@ -297,12 +295,15 @@ class Interpreter final { void Abort(); const RequestRouterInterface *GetRequestRouter() const { return request_router_.get(); } + void InstallSimulatorTicker(std::function<bool()> &&tick_simulator) { + request_router_->InstallSimulatorTicker(tick_simulator); + } private: struct QueryExecution { - std::optional<PreparedQuery> prepared_query; utils::MonotonicBufferResource execution_memory{kExecutionMemoryBlockSize}; utils::ResourceWithOutOfMemoryException execution_memory_with_exception{&execution_memory}; + std::optional<PreparedQuery> prepared_query; std::map<std::string, TypedValue> summary; std::vector<Notification> notifications; diff --git a/src/query/v2/multiframe.cpp b/src/query/v2/multiframe.cpp index 38cc7549a..8bbba08bf 100644 --- a/src/query/v2/multiframe.cpp +++ b/src/query/v2/multiframe.cpp @@ -17,6 +17,9 @@ #include "query/v2/bindings/frame.hpp" #include "utils/pmr/vector.hpp" +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_uint64(default_multi_frame_size, 100, "Default size of MultiFrame"); + namespace memgraph::query::v2 { static_assert(std::forward_iterator<ValidFramesReader::Iterator>); @@ -54,15 +57,17 @@ bool MultiFrame::HasInvalidFrame() const noexcept { // NOLINTNEXTLINE (bugprone-exception-escape) void MultiFrame::DefragmentValidFrames() noexcept { - /* - from: https://en.cppreference.com/w/cpp/algorithm/remove - "Removing is done by shifting (by means of copy assignment (until C++11)move assignment (since C++11)) the elements - in the range in such a way that the elements that are not to be removed appear in the beginning of the range. - Relative order of the elements that remain is preserved and the physical size of the container is unchanged." - */ - - // NOLINTNEXTLINE (bugprone-unused-return-value) - std::remove_if(frames_.begin(), frames_.end(), [](auto &frame) { return !frame.IsValid(); }); + static constexpr auto kIsValid = [](const FrameWithValidity &frame) { return frame.IsValid(); }; + static constexpr auto kIsInvalid = [](const FrameWithValidity &frame) { return !frame.IsValid(); }; + auto first_invalid_frame = std::find_if(frames_.begin(), frames_.end(), kIsInvalid); + auto following_first_valid = std::find_if(first_invalid_frame, frames_.end(), kIsValid); + while (first_invalid_frame != frames_.end() && following_first_valid != frames_.end()) { + std::swap(*first_invalid_frame, *following_first_valid); + first_invalid_frame++; + first_invalid_frame = std::find_if(first_invalid_frame, frames_.end(), kIsInvalid); + following_first_valid++; + following_first_valid = std::find_if(following_first_valid, frames_.end(), kIsValid); + } } ValidFramesReader MultiFrame::GetValidFramesReader() { return ValidFramesReader{*this}; } diff --git a/src/query/v2/multiframe.hpp b/src/query/v2/multiframe.hpp index b46be171a..2092ec4a2 100644 --- a/src/query/v2/multiframe.hpp +++ b/src/query/v2/multiframe.hpp @@ -13,10 +13,14 @@ #include <iterator> +#include <gflags/gflags.h> + #include "query/v2/bindings/frame.hpp" +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DECLARE_uint64(default_multi_frame_size); + namespace memgraph::query::v2 { -constexpr uint64_t kNumberOfFramesInMultiframe = 1000; // TODO have it configurable class ValidFramesConsumer; class ValidFramesModifier; @@ -33,6 +37,7 @@ class MultiFrame { MultiFrame(size_t size_of_frame, size_t number_of_frames, utils::MemoryResource *execution_memory); ~MultiFrame() = default; + // Assigning and moving the MultiFrame is not allowed if any accessor from the above ones are alive. MultiFrame(const MultiFrame &other); MultiFrame(MultiFrame &&other) noexcept; MultiFrame &operator=(const MultiFrame &other) = delete; @@ -97,9 +102,9 @@ class ValidFramesReader { ~ValidFramesReader() = default; ValidFramesReader(const ValidFramesReader &other) = delete; - ValidFramesReader(ValidFramesReader &&other) noexcept = delete; + ValidFramesReader(ValidFramesReader &&other) noexcept = default; ValidFramesReader &operator=(const ValidFramesReader &other) = delete; - ValidFramesReader &operator=(ValidFramesReader &&other) noexcept = delete; + ValidFramesReader &operator=(ValidFramesReader &&other) noexcept = default; struct Iterator { using iterator_category = std::forward_iterator_tag; @@ -147,9 +152,9 @@ class ValidFramesModifier { ~ValidFramesModifier() = default; ValidFramesModifier(const ValidFramesModifier &other) = delete; - ValidFramesModifier(ValidFramesModifier &&other) noexcept = delete; + ValidFramesModifier(ValidFramesModifier &&other) noexcept = default; ValidFramesModifier &operator=(const ValidFramesModifier &other) = delete; - ValidFramesModifier &operator=(ValidFramesModifier &&other) noexcept = delete; + ValidFramesModifier &operator=(ValidFramesModifier &&other) noexcept = default; struct Iterator { using iterator_category = std::forward_iterator_tag; @@ -202,9 +207,9 @@ class ValidFramesConsumer { ~ValidFramesConsumer() noexcept; ValidFramesConsumer(const ValidFramesConsumer &other) = delete; - ValidFramesConsumer(ValidFramesConsumer &&other) noexcept = delete; + ValidFramesConsumer(ValidFramesConsumer &&other) noexcept = default; ValidFramesConsumer &operator=(const ValidFramesConsumer &other) = delete; - ValidFramesConsumer &operator=(ValidFramesConsumer &&other) noexcept = delete; + ValidFramesConsumer &operator=(ValidFramesConsumer &&other) noexcept = default; struct Iterator { using iterator_category = std::forward_iterator_tag; @@ -256,9 +261,9 @@ class InvalidFramesPopulator { ~InvalidFramesPopulator() = default; InvalidFramesPopulator(const InvalidFramesPopulator &other) = delete; - InvalidFramesPopulator(InvalidFramesPopulator &&other) noexcept = delete; + InvalidFramesPopulator(InvalidFramesPopulator &&other) noexcept = default; InvalidFramesPopulator &operator=(const InvalidFramesPopulator &other) = delete; - InvalidFramesPopulator &operator=(InvalidFramesPopulator &&other) noexcept = delete; + InvalidFramesPopulator &operator=(InvalidFramesPopulator &&other) noexcept = default; struct Iterator { using iterator_category = std::forward_iterator_tag; diff --git a/src/query/v2/plan/cost_estimator.hpp b/src/query/v2/plan/cost_estimator.hpp index 250274a73..f497d14d5 100644 --- a/src/query/v2/plan/cost_estimator.hpp +++ b/src/query/v2/plan/cost_estimator.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp index c07d8d4aa..993d24282 100644 --- a/src/query/v2/plan/operator.cpp +++ b/src/query/v2/plan/operator.cpp @@ -38,6 +38,7 @@ #include "query/v2/db_accessor.hpp" #include "query/v2/exceptions.hpp" #include "query/v2/frontend/ast/ast.hpp" +#include "query/v2/multiframe.hpp" #include "query/v2/path.hpp" #include "query/v2/plan/scoped_profile.hpp" #include "query/v2/request_router.hpp" @@ -189,15 +190,19 @@ class DistributedCreateNodeCursor : public Cursor { return false; } - void PullMultiple(MultiFrame &multi_frame, ExecutionContext &context) override { + bool PullMultiple(MultiFrame &multi_frame, ExecutionContext &context) override { SCOPED_PROFILE_OP("CreateNodeMF"); - input_cursor_->PullMultiple(multi_frame, context); + auto *request_router = context.request_router; + if (!input_cursor_->PullMultiple(multi_frame, context)) { + return false; + } { SCOPED_REQUEST_WAIT_PROFILE; request_router->CreateVertices(NodeCreationInfoToRequests(context, multi_frame)); } PlaceNodesOnTheMultiFrame(multi_frame, context); + return true; } void Shutdown() override { input_cursor_->Shutdown(); } @@ -213,6 +218,7 @@ class DistributedCreateNodeCursor : public Cursor { } std::vector<msgs::NewVertex> NodeCreationInfoToRequest(ExecutionContext &context, Frame &frame) { + primary_keys_.clear(); std::vector<msgs::NewVertex> requests; msgs::PrimaryKey pk; msgs::NewVertex rqst; @@ -222,22 +228,27 @@ class DistributedCreateNodeCursor : public Cursor { ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, nullptr, storage::v3::View::NEW); if (const auto *node_info_properties = std::get_if<PropertiesMapList>(&node_info_.properties)) { - for (const auto &[key, value_expression] : *node_info_properties) { + for (const auto &[property, value_expression] : *node_info_properties) { TypedValue val = value_expression->Accept(evaluator); - if (context.request_router->IsPrimaryKey(primary_label, key)) { - rqst.primary_key.push_back(TypedValueToValue(val)); - pk.push_back(TypedValueToValue(val)); + auto msgs_value = TypedValueToValue(val); + if (context.request_router->IsPrimaryProperty(primary_label, property)) { + rqst.primary_key.push_back(msgs_value); + pk.push_back(std::move(msgs_value)); + } else { + rqst.properties.emplace_back(property, std::move(msgs_value)); } } } else { auto property_map = evaluator.Visit(*std::get<ParameterLookup *>(node_info_.properties)).ValueMap(); - for (const auto &[key, value] : property_map) { - auto key_str = std::string(key); - auto property_id = context.request_router->NameToProperty(key_str); - if (context.request_router->IsPrimaryKey(primary_label, property_id)) { - rqst.primary_key.push_back(TypedValueToValue(value)); - pk.push_back(TypedValueToValue(value)); - } + for (const auto &[property, typed_value] : property_map) { + auto property_str = std::string(property); + auto property_id = context.request_router->NameToProperty(property_str); + auto msgs_value = TypedValueToValue(typed_value); + if (context.request_router->IsPrimaryProperty(primary_label, property_id)) { + rqst.primary_key.push_back(msgs_value); + pk.push_back(std::move(msgs_value)); + } else + rqst.properties.emplace_back(property_id, std::move(msgs_value)); } } @@ -263,6 +274,7 @@ class DistributedCreateNodeCursor : public Cursor { } std::vector<msgs::NewVertex> NodeCreationInfoToRequests(ExecutionContext &context, MultiFrame &multi_frame) { + primary_keys_.clear(); std::vector<msgs::NewVertex> requests; auto multi_frame_modifier = multi_frame.GetValidFramesModifier(); for (auto &frame : multi_frame_modifier) { @@ -275,22 +287,27 @@ class DistributedCreateNodeCursor : public Cursor { ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, nullptr, storage::v3::View::NEW); if (const auto *node_info_properties = std::get_if<PropertiesMapList>(&node_info_.properties)) { - for (const auto &[key, value_expression] : *node_info_properties) { + for (const auto &[property, value_expression] : *node_info_properties) { TypedValue val = value_expression->Accept(evaluator); - if (context.request_router->IsPrimaryKey(primary_label, key)) { - rqst.primary_key.push_back(TypedValueToValue(val)); - pk.push_back(TypedValueToValue(val)); + auto msgs_value = TypedValueToValue(val); + if (context.request_router->IsPrimaryProperty(primary_label, property)) { + rqst.primary_key.push_back(msgs_value); + pk.push_back(std::move(msgs_value)); + } else { + rqst.properties.emplace_back(property, std::move(msgs_value)); } } } else { auto property_map = evaluator.Visit(*std::get<ParameterLookup *>(node_info_.properties)).ValueMap(); - for (const auto &[key, value] : property_map) { - auto key_str = std::string(key); - auto property_id = context.request_router->NameToProperty(key_str); - if (context.request_router->IsPrimaryKey(primary_label, property_id)) { - rqst.primary_key.push_back(TypedValueToValue(value)); - pk.push_back(TypedValueToValue(value)); - } + for (const auto &[property, typed_value] : property_map) { + auto property_str = std::string(property); + auto property_id = context.request_router->NameToProperty(property_str); + auto msgs_value = TypedValueToValue(typed_value); + if (context.request_router->IsPrimaryProperty(primary_label, property_id)) { + rqst.primary_key.push_back(msgs_value); + pk.push_back(std::move(msgs_value)); + } else + rqst.properties.emplace_back(property_id, std::move(msgs_value)); } } @@ -321,14 +338,16 @@ bool Once::OnceCursor::Pull(Frame &, ExecutionContext &context) { return false; } -void Once::OnceCursor::PullMultiple(MultiFrame &multi_frame, ExecutionContext &context) { +bool Once::OnceCursor::PullMultiple(MultiFrame &multi_frame, ExecutionContext &context) { SCOPED_PROFILE_OP("OnceMF"); if (!did_pull_) { auto &first_frame = multi_frame.GetFirstFrame(); first_frame.MakeValid(); did_pull_ = true; + return true; } + return false; } UniqueCursorPtr Once::MakeCursor(utils::MemoryResource *mem) const { @@ -449,57 +468,116 @@ class DistributedScanAllAndFilterCursor : public Cursor { ResetExecutionState(); } - enum class State : int8_t { INITIALIZING, COMPLETED }; - using VertexAccessor = accessors::VertexAccessor; - bool MakeRequest(RequestRouterInterface &request_router, ExecutionContext &context) { + bool MakeRequest(ExecutionContext &context) { { SCOPED_REQUEST_WAIT_PROFILE; std::optional<std::string> request_label = std::nullopt; if (label_.has_value()) { - request_label = request_router.LabelToName(*label_); + request_label = context.request_router->LabelToName(*label_); } - current_batch = request_router.ScanVertices(request_label); + current_batch_ = context.request_router->ScanVertices(request_label); } - current_vertex_it = current_batch.begin(); - request_state_ = State::COMPLETED; - return !current_batch.empty(); + current_vertex_it_ = current_batch_.begin(); + return !current_batch_.empty(); } bool Pull(Frame &frame, ExecutionContext &context) override { SCOPED_PROFILE_OP(op_name_); - auto &request_router = *context.request_router; while (true) { if (MustAbort(context)) { throw HintedAbortError(); } - if (request_state_ == State::INITIALIZING) { - if (!input_cursor_->Pull(frame, context)) { + if (current_vertex_it_ == current_batch_.end()) { + ResetExecutionState(); + if (!input_cursor_->Pull(frame, context) || !MakeRequest(context)) { return false; } } - if (current_vertex_it == current_batch.end() && - (request_state_ == State::COMPLETED || !MakeRequest(request_router, context))) { - ResetExecutionState(); - continue; - } - - frame[output_symbol_] = TypedValue(std::move(*current_vertex_it)); - ++current_vertex_it; + frame[output_symbol_] = TypedValue(std::move(*current_vertex_it_)); + ++current_vertex_it_; return true; } } + bool PullMultiple(MultiFrame &output_multi_frame, ExecutionContext &context) override { + SCOPED_PROFILE_OP(op_name_); + + if (!own_multi_frame_.has_value()) { + own_multi_frame_.emplace(MultiFrame(output_multi_frame.GetFirstFrame().elems().size(), + FLAGS_default_multi_frame_size, output_multi_frame.GetMemoryResource())); + own_frames_consumer_.emplace(own_multi_frame_->GetValidFramesConsumer()); + own_frames_it_ = own_frames_consumer_->begin(); + } + + auto output_frames_populator = output_multi_frame.GetInvalidFramesPopulator(); + auto populated_any = false; + + while (true) { + switch (state_) { + case State::PullInput: { + if (!input_cursor_->PullMultiple(*own_multi_frame_, context)) { + state_ = State::Exhausted; + return populated_any; + } + own_frames_consumer_.emplace(own_multi_frame_->GetValidFramesConsumer()); + own_frames_it_ = own_frames_consumer_->begin(); + state_ = State::FetchVertices; + break; + } + case State::FetchVertices: { + if (own_frames_it_ == own_frames_consumer_->end()) { + state_ = State::PullInput; + continue; + } + if (!filter_expressions_->empty() || property_expression_pair_.has_value() || current_batch_.empty()) { + MakeRequest(context); + } else { + // We can reuse the vertices as they don't depend on any value from the frames + current_vertex_it_ = current_batch_.begin(); + } + state_ = State::PopulateOutput; + break; + } + case State::PopulateOutput: { + if (!output_multi_frame.HasInvalidFrame()) { + return populated_any; + } + if (current_vertex_it_ == current_batch_.end()) { + own_frames_it_->MakeInvalid(); + ++own_frames_it_; + state_ = State::FetchVertices; + continue; + } + + for (auto output_frame_it = output_frames_populator.begin(); + output_frame_it != output_frames_populator.end() && current_vertex_it_ != current_batch_.end(); + ++output_frame_it) { + auto &output_frame = *output_frame_it; + output_frame = *own_frames_it_; + output_frame[output_symbol_] = TypedValue(*current_vertex_it_); + current_vertex_it_++; + populated_any = true; + } + break; + } + case State::Exhausted: { + return populated_any; + } + } + } + return populated_any; + }; + void Shutdown() override { input_cursor_->Shutdown(); } void ResetExecutionState() { - current_batch.clear(); - current_vertex_it = current_batch.end(); - request_state_ = State::INITIALIZING; + current_batch_.clear(); + current_vertex_it_ = current_batch_.end(); } void Reset() override { @@ -508,15 +586,20 @@ class DistributedScanAllAndFilterCursor : public Cursor { } private: + enum class State { PullInput, FetchVertices, PopulateOutput, Exhausted }; + + State state_{State::PullInput}; const Symbol output_symbol_; const UniqueCursorPtr input_cursor_; const char *op_name_; - std::vector<VertexAccessor> current_batch; - std::vector<VertexAccessor>::iterator current_vertex_it; - State request_state_ = State::INITIALIZING; + std::vector<VertexAccessor> current_batch_; + std::vector<VertexAccessor>::iterator current_vertex_it_{current_batch_.begin()}; std::optional<storage::v3::LabelId> label_; std::optional<std::pair<storage::v3::PropertyId, Expression *>> property_expression_pair_; std::optional<std::vector<Expression *>> filter_expressions_; + std::optional<MultiFrame> own_multi_frame_; + std::optional<ValidFramesConsumer> own_frames_consumer_; + ValidFramesConsumer::Iterator own_frames_it_; }; class DistributedScanByPrimaryKeyCursor : public Cursor { @@ -532,8 +615,6 @@ class DistributedScanByPrimaryKeyCursor : public Cursor { filter_expressions_(filter_expressions), primary_key_(primary_key) {} - enum class State : int8_t { INITIALIZING, COMPLETED }; - using VertexAccessor = accessors::VertexAccessor; std::optional<VertexAccessor> MakeRequestSingleFrame(Frame &frame, RequestRouterInterface &request_router, @@ -566,6 +647,43 @@ class DistributedScanByPrimaryKeyCursor : public Cursor { return VertexAccessor(vertex, properties, &request_router); } + void MakeRequestMultiFrame(MultiFrame &multi_frame, RequestRouterInterface &request_router, + ExecutionContext &context) { + msgs::GetPropertiesRequest req; + const msgs::Label label = {.id = msgs::LabelId::FromUint(label_.AsUint())}; + + std::unordered_set<msgs::VertexId> used_vertex_ids; + + for (auto &frame : multi_frame.GetValidFramesModifier()) { + ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router, + storage::v3::View::NEW); + + std::vector<msgs::Value> pk; + for (auto *primary_property : primary_key_) { + pk.push_back(TypedValueToValue(primary_property->Accept(evaluator))); + } + + auto vertex_id = std::make_pair(label, std::move(pk)); + auto [it, inserted] = used_vertex_ids.emplace(std::move(vertex_id)); + if (inserted) { + req.vertex_ids.emplace_back(*it); + } + } + + auto get_prop_result = std::invoke([&context, &request_router, &req]() mutable { + SCOPED_REQUEST_WAIT_PROFILE; + return request_router.GetProperties(req); + }); + + for (auto &result : get_prop_result) { + // TODO (gvolfing) figure out labels when relevant. + msgs::Vertex vertex = {.id = result.vertex, .labels = {}}; + + id_to_accessor_mapping_.emplace(result.vertex, + VertexAccessor(std::move(vertex), std::move(result.props), &request_router)); + } + } + bool Pull(Frame &frame, ExecutionContext &context) override { SCOPED_PROFILE_OP(op_name_); @@ -584,8 +702,88 @@ class DistributedScanByPrimaryKeyCursor : public Cursor { return false; } - void PullMultiple(MultiFrame & /*input_multi_frame*/, ExecutionContext & /*context*/) override { - throw utils::NotYetImplemented("Multiframe version of ScanByPrimaryKey is yet to be implemented."); + void EnsureOwnMultiFrameIsGood(MultiFrame &output_multi_frame) { + if (!own_multi_frame_.has_value()) { + own_multi_frame_.emplace(MultiFrame(output_multi_frame.GetFirstFrame().elems().size(), + FLAGS_default_multi_frame_size, output_multi_frame.GetMemoryResource())); + own_frames_consumer_.emplace(own_multi_frame_->GetValidFramesConsumer()); + own_frames_it_ = own_frames_consumer_->begin(); + } + MG_ASSERT(output_multi_frame.GetFirstFrame().elems().size() == own_multi_frame_->GetFirstFrame().elems().size()); + } + + bool PullMultiple(MultiFrame &output_multi_frame, ExecutionContext &context) override { + SCOPED_PROFILE_OP(op_name_); + EnsureOwnMultiFrameIsGood(output_multi_frame); + + auto output_frames_populator = output_multi_frame.GetInvalidFramesPopulator(); + auto populated_any = false; + + while (true) { + switch (state_) { + case State::PullInput: { + id_to_accessor_mapping_.clear(); + if (!input_cursor_->PullMultiple(*own_multi_frame_, context)) { + state_ = State::Exhausted; + return populated_any; + } + own_frames_consumer_.emplace(own_multi_frame_->GetValidFramesConsumer()); + own_frames_it_ = own_frames_consumer_->begin(); + + if (own_frames_it_ == own_frames_consumer_->end()) { + continue; + } + + MakeRequestMultiFrame(*own_multi_frame_, *context.request_router, context); + + state_ = State::PopulateOutput; + break; + } + case State::PopulateOutput: { + if (!output_multi_frame.HasInvalidFrame()) { + if (own_frames_it_ == own_frames_consumer_->end()) { + id_to_accessor_mapping_.clear(); + } + return populated_any; + } + + if (own_frames_it_ == own_frames_consumer_->end()) { + state_ = State::PullInput; + continue; + } + + for (auto output_frame_it = output_frames_populator.begin(); + output_frame_it != output_frames_populator.end() && own_frames_it_ != own_frames_consumer_->end(); + ++own_frames_it_) { + auto &output_frame = *output_frame_it; + + ExpressionEvaluator evaluator(&*own_frames_it_, context.symbol_table, context.evaluation_context, + context.request_router, storage::v3::View::NEW); + + std::vector<msgs::Value> pk; + for (auto *primary_property : primary_key_) { + pk.push_back(TypedValueToValue(primary_property->Accept(evaluator))); + } + + const msgs::Label label = {.id = msgs::LabelId::FromUint(label_.AsUint())}; + auto vertex_id = std::make_pair(label, std::move(pk)); + + if (const auto it = id_to_accessor_mapping_.find(vertex_id); it != id_to_accessor_mapping_.end()) { + output_frame = *own_frames_it_; + output_frame[output_symbol_] = TypedValue(it->second); + populated_any = true; + ++output_frame_it; + } + own_frames_it_->MakeInvalid(); + } + break; + } + case State::Exhausted: { + return populated_any; + } + } + } + return populated_any; }; void Reset() override { input_cursor_->Reset(); } @@ -593,12 +791,19 @@ class DistributedScanByPrimaryKeyCursor : public Cursor { void Shutdown() override { input_cursor_->Shutdown(); } private: + enum class State { PullInput, PopulateOutput, Exhausted }; + + State state_{State::PullInput}; const Symbol output_symbol_; const UniqueCursorPtr input_cursor_; const char *op_name_; storage::v3::LabelId label_; std::optional<std::vector<Expression *>> filter_expressions_; std::vector<Expression *> primary_key_; + std::optional<MultiFrame> own_multi_frame_; + std::optional<ValidFramesConsumer> own_frames_consumer_; + ValidFramesConsumer::Iterator own_frames_it_; + std::unordered_map<msgs::VertexId, VertexAccessor> id_to_accessor_mapping_; }; ScanAll::ScanAll(const std::shared_ptr<LogicalOperator> &input, Symbol output_symbol, storage::v3::View view) @@ -606,8 +811,6 @@ ScanAll::ScanAll(const std::shared_ptr<LogicalOperator> &input, Symbol output_sy ACCEPT_WITH_INPUT(ScanAll) -class DistributedScanAllCursor; - UniqueCursorPtr ScanAll::MakeCursor(utils::MemoryResource *mem) const { EventCounter::IncrementCounter(EventCounter::ScanAllOperator); @@ -854,6 +1057,27 @@ bool Filter::FilterCursor::Pull(Frame &frame, ExecutionContext &context) { return false; } +bool Filter::FilterCursor::PullMultiple(MultiFrame &multi_frame, ExecutionContext &context) { + SCOPED_PROFILE_OP("Filter"); + auto populated_any = false; + + while (multi_frame.HasInvalidFrame()) { + if (!input_cursor_->PullMultiple(multi_frame, context)) { + return populated_any; + } + for (auto &frame : multi_frame.GetValidFramesConsumer()) { + ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router, + storage::v3::View::OLD); + if (!EvaluateFilter(evaluator, self_.expression_)) { + frame.MakeInvalid(); + } else { + populated_any = true; + } + } + } + return populated_any; +} + void Filter::FilterCursor::Shutdown() { input_cursor_->Shutdown(); } void Filter::FilterCursor::Reset() { input_cursor_->Reset(); } @@ -889,19 +1113,22 @@ bool Produce::ProduceCursor::Pull(Frame &frame, ExecutionContext &context) { // Produce should always yield the latest results. ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router, storage::v3::View::NEW); - for (auto named_expr : self_.named_expressions_) named_expr->Accept(evaluator); + for (auto *named_expr : self_.named_expressions_) named_expr->Accept(evaluator); return true; } return false; } -void Produce::ProduceCursor::PullMultiple(MultiFrame &multi_frame, ExecutionContext &context) { +bool Produce::ProduceCursor::PullMultiple(MultiFrame &multi_frame, ExecutionContext &context) { SCOPED_PROFILE_OP("ProduceMF"); - input_cursor_->PullMultiple(multi_frame, context); + if (!input_cursor_->PullMultiple(multi_frame, context)) { + return false; + } auto iterator_for_valid_frame_only = multi_frame.GetValidFramesModifier(); + for (auto &frame : iterator_for_valid_frame_only) { // Produce should always yield the latest results. ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router, @@ -911,7 +1138,9 @@ void Produce::ProduceCursor::PullMultiple(MultiFrame &multi_frame, ExecutionCont named_expr->Accept(evaluator); } } -}; + + return true; +} void Produce::ProduceCursor::Shutdown() { input_cursor_->Shutdown(); } @@ -936,6 +1165,8 @@ Delete::DeleteCursor::DeleteCursor(const Delete &self, utils::MemoryResource *me bool Delete::DeleteCursor::Pull(Frame & /*frame*/, ExecutionContext & /*context*/) { return false; } +bool Delete::DeleteCursor::PullMultiple(MultiFrame & /*multi_frame*/, ExecutionContext & /*context*/) { return false; } + void Delete::DeleteCursor::Shutdown() { input_cursor_->Shutdown(); } void Delete::DeleteCursor::Reset() { input_cursor_->Reset(); } @@ -1114,28 +1345,47 @@ bool ContainsSameEdge(const TypedValue &a, const TypedValue &b) { return a.ValueEdge() == b.ValueEdge(); } + +bool IsExpansionOk(Frame &frame, const Symbol &expand_symbol, const std::vector<Symbol> &previous_symbols) { + // This shouldn't raise a TypedValueException, because the planner + // makes sure these are all of the expected type. In case they are not + // an error should be raised long before this code is executed. + return std::ranges::all_of(previous_symbols, + [&frame, &expand_value = frame[expand_symbol]](const auto &previous_symbol) { + const auto &previous_value = frame[previous_symbol]; + return !ContainsSameEdge(previous_value, expand_value); + }); +} + } // namespace bool EdgeUniquenessFilter::EdgeUniquenessFilterCursor::Pull(Frame &frame, ExecutionContext &context) { SCOPED_PROFILE_OP("EdgeUniquenessFilter"); - - auto expansion_ok = [&]() { - const auto &expand_value = frame[self_.expand_symbol_]; - for (const auto &previous_symbol : self_.previous_symbols_) { - const auto &previous_value = frame[previous_symbol]; - // This shouldn't raise a TypedValueException, because the planner - // makes sure these are all of the expected type. In case they are not - // an error should be raised long before this code is executed. - if (ContainsSameEdge(previous_value, expand_value)) return false; - } - return true; - }; - while (input_cursor_->Pull(frame, context)) - if (expansion_ok()) return true; + if (IsExpansionOk(frame, self_.expand_symbol_, self_.previous_symbols_)) return true; return false; } +bool EdgeUniquenessFilter::EdgeUniquenessFilterCursor::PullMultiple(MultiFrame &output_multi_frame, + ExecutionContext &context) { + SCOPED_PROFILE_OP("EdgeUniquenessFilterMF"); + auto populated_any = false; + + while (output_multi_frame.HasInvalidFrame()) { + if (!input_cursor_->PullMultiple(output_multi_frame, context)) { + return populated_any; + } + for (auto &frame : output_multi_frame.GetValidFramesConsumer()) { + if (IsExpansionOk(frame, self_.expand_symbol_, self_.previous_symbols_)) { + populated_any = true; + } else { + frame.MakeInvalid(); + } + } + } + return populated_any; +} + void EdgeUniquenessFilter::EdgeUniquenessFilterCursor::Shutdown() { input_cursor_->Shutdown(); } void EdgeUniquenessFilter::EdgeUniquenessFilterCursor::Reset() { input_cursor_->Reset(); } @@ -1227,6 +1477,55 @@ class AggregateCursor : public Cursor { auto remember_values_it = aggregation_it_->second.remember_.begin(); for (const Symbol &remember_sym : self_.remember_) frame[remember_sym] = *remember_values_it++; + ++aggregation_it_; + return true; + } + + bool PullMultiple(MultiFrame &multi_frame, ExecutionContext &context) override { + SCOPED_PROFILE_OP("AggregateMF"); + + if (!pulled_all_input_) { + ProcessAll(multi_frame, &context); + pulled_all_input_ = true; + MG_ASSERT(!multi_frame.HasValidFrame(), "ProcessAll didn't consumed all input frames!"); + aggregation_it_ = aggregation_.begin(); + + // in case there is no input and no group_bys we need to return true + // just this once + if (aggregation_.empty() && self_.group_by_.empty()) { + auto frame = multi_frame.GetFirstFrame(); + frame.MakeValid(); + auto *pull_memory = context.evaluation_context.memory; + // place default aggregation values on the frame + for (const auto &elem : self_.aggregations_) { + frame[elem.output_sym] = DefaultAggregationOpValue(elem, pull_memory); + } + // place null as remember values on the frame + for (const Symbol &remember_sym : self_.remember_) { + frame[remember_sym] = TypedValue(pull_memory); + } + return true; + } + } + + if (aggregation_it_ == aggregation_.end()) { + return false; + } + + // place aggregation values on the frame + auto &frame = multi_frame.GetFirstFrame(); + frame.MakeValid(); + auto aggregation_values_it = aggregation_it_->second.values_.begin(); + for (const auto &aggregation_elem : self_.aggregations_) { + frame[aggregation_elem.output_sym] = *aggregation_values_it++; + } + + // place remember values on the frame + auto remember_values_it = aggregation_it_->second.remember_.begin(); + for (const Symbol &remember_sym : self_.remember_) { + frame[remember_sym] = *remember_values_it++; + } + aggregation_it_++; return true; } @@ -1295,18 +1594,23 @@ class AggregateCursor : public Cursor { ProcessOne(*frame, &evaluator); } - // calculate AVG aggregations (so far they have only been summed) - for (size_t pos = 0; pos < self_.aggregations_.size(); ++pos) { - if (self_.aggregations_[pos].op != Aggregation::Op::AVG) continue; - for (auto &kv : aggregation_) { - AggregationValue &agg_value = kv.second; - auto count = agg_value.counts_[pos]; - auto *pull_memory = context->evaluation_context.memory; - if (count > 0) { - agg_value.values_[pos] = agg_value.values_[pos] / TypedValue(static_cast<double>(count), pull_memory); - } + CalculateAverages(*context); + } + + void ProcessAll(MultiFrame &multi_frame, ExecutionContext *context) { + while (input_cursor_->PullMultiple(multi_frame, *context)) { + auto valid_frames_modifier = + multi_frame.GetValidFramesConsumer(); // consumer is needed i.o. reader because of the evaluator + + for (auto &frame : valid_frames_modifier) { + ExpressionEvaluator evaluator(&frame, context->symbol_table, context->evaluation_context, + context->request_router, storage::v3::View::NEW); + ProcessOne(frame, &evaluator); + frame.MakeInvalid(); } } + + CalculateAverages(*context); } /** @@ -1324,6 +1628,20 @@ class AggregateCursor : public Cursor { Update(evaluator, &agg_value); } + void CalculateAverages(ExecutionContext &context) { + for (size_t pos = 0; pos < self_.aggregations_.size(); ++pos) { + if (self_.aggregations_[pos].op != Aggregation::Op::AVG) continue; + for (auto &kv : aggregation_) { + AggregationValue &agg_value = kv.second; + auto count = agg_value.counts_[pos]; + auto *pull_memory = context.evaluation_context.memory; + if (count > 0) { + agg_value.values_[pos] = agg_value.values_[pos] / TypedValue(static_cast<double>(count), pull_memory); + } + } + } + } + /** Ensures the new AggregationValue has been initialized. This means * that the value vectors are filled with an appropriate number of Nulls, * counts are set to 0 and remember values are remembered. @@ -1357,7 +1675,7 @@ class AggregateCursor : public Cursor { for (; count_it < agg_value->counts_.end(); count_it++, value_it++, agg_elem_it++) { // COUNT(*) is the only case where input expression is optional // handle it here - auto input_expr_ptr = agg_elem_it->value; + auto *input_expr_ptr = agg_elem_it->value; if (!input_expr_ptr) { *count_it += 1; *value_it = *count_it; @@ -1448,7 +1766,7 @@ class AggregateCursor : public Cursor { /** Checks if the given TypedValue is legal in MIN and MAX. If not * an appropriate exception is thrown. */ - void EnsureOkForMinMax(const TypedValue &value) const { + static void EnsureOkForMinMax(const TypedValue &value) { switch (value.type()) { case TypedValue::Type::Bool: case TypedValue::Type::Int: @@ -1464,7 +1782,7 @@ class AggregateCursor : public Cursor { /** Checks if the given TypedValue is legal in AVG and SUM. If not * an appropriate exception is thrown. */ - void EnsureOkForAvgSum(const TypedValue &value) const { + static void EnsureOkForAvgSum(const TypedValue &value) { switch (value.type()) { case TypedValue::Type::Int: case TypedValue::Type::Double: @@ -1879,14 +2197,7 @@ class UnwindCursor : public Cursor { if (!input_cursor_->Pull(frame, context)) return false; // successful pull from input, initialize value and iterator - ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router, - storage::v3::View::OLD); - TypedValue input_value = self_.input_expression_->Accept(evaluator); - if (input_value.type() != TypedValue::Type::List) - throw QueryRuntimeException("Argument of UNWIND must be a list, but '{}' was provided.", input_value.type()); - // Copy the evaluted input_value_list to our vector. - input_value_ = input_value.ValueList(); - input_value_it_ = input_value_.begin(); + SetInputValue(frame, context); } // if we reached the end of our list of values goto back to top @@ -1897,6 +2208,70 @@ class UnwindCursor : public Cursor { } } + bool PullMultiple(MultiFrame &output_multi_frame, ExecutionContext &context) override { + SCOPED_PROFILE_OP("UnwindMF"); + + if (!own_multi_frame_.has_value()) { + own_multi_frame_.emplace(MultiFrame(output_multi_frame.GetFirstFrame().elems().size(), + FLAGS_default_multi_frame_size, output_multi_frame.GetMemoryResource())); + own_frames_consumer_.emplace(own_multi_frame_->GetValidFramesConsumer()); + own_frames_it_ = own_frames_consumer_->begin(); + } + + auto output_frames_populator = output_multi_frame.GetInvalidFramesPopulator(); + auto populated_any = false; + + while (true) { + switch (state_) { + case State::PullInput: { + if (!input_cursor_->PullMultiple(*own_multi_frame_, context)) { + state_ = State::Exhausted; + return populated_any; + } + own_frames_consumer_.emplace(own_multi_frame_->GetValidFramesConsumer()); + own_frames_it_ = own_frames_consumer_->begin(); + state_ = State::InitializeInputValue; + break; + } + case State::InitializeInputValue: { + if (own_frames_it_ == own_frames_consumer_->end()) { + state_ = State::PullInput; + continue; + } + SetInputValue(*own_frames_it_, context); + state_ = State::PopulateOutput; + break; + } + case State::PopulateOutput: { + if (!output_multi_frame.HasInvalidFrame()) { + return populated_any; + } + if (input_value_it_ == input_value_.end()) { + own_frames_it_->MakeInvalid(); + ++own_frames_it_; + state_ = State::InitializeInputValue; + continue; + } + + for (auto output_frame_it = output_frames_populator.begin(); + output_frame_it != output_frames_populator.end() && input_value_it_ != input_value_.end(); + ++output_frame_it) { + auto &output_frame = *output_frame_it; + output_frame = *own_frames_it_; + output_frame[self_.output_symbol_] = std::move(*input_value_it_); + input_value_it_++; + populated_any = true; + } + break; + } + case State::Exhausted: { + return populated_any; + } + } + } + return populated_any; + } + void Shutdown() override { input_cursor_->Shutdown(); } void Reset() override { @@ -1905,13 +2280,36 @@ class UnwindCursor : public Cursor { input_value_it_ = input_value_.end(); } + void SetInputValue(Frame &frame, ExecutionContext &context) { + ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router, + storage::v3::View::OLD); + TypedValue input_value = self_.input_expression_->Accept(evaluator); + if (input_value.type() != TypedValue::Type::List) { + throw QueryRuntimeException("Argument of UNWIND must be a list, but '{}' was provided.", input_value.type()); + } + // It would be nice if we could move it, however it can be tricky to make it work because of allocators and + // different memory resources, be careful. + input_value_ = std::move(input_value.ValueList()); + input_value_it_ = input_value_.begin(); + } + private: + using InputVector = utils::pmr::vector<TypedValue>; + using InputIterator = InputVector::iterator; + const Unwind &self_; const UniqueCursorPtr input_cursor_; // typed values we are unwinding and yielding - utils::pmr::vector<TypedValue> input_value_; + InputVector input_value_; // current position in input_value_ - decltype(input_value_)::iterator input_value_it_ = input_value_.end(); + InputIterator input_value_it_ = input_value_.end(); + + enum class State { PullInput, InitializeInputValue, PopulateOutput, Exhausted }; + + State state_{State::PullInput}; + std::optional<MultiFrame> own_multi_frame_; + std::optional<ValidFramesConsumer> own_frames_consumer_; + ValidFramesConsumer::Iterator own_frames_it_; }; UniqueCursorPtr Unwind::MakeCursor(utils::MemoryResource *mem) const { @@ -2514,9 +2912,11 @@ class DistributedCreateExpandCursor : public Cursor { return true; } - void PullMultiple(MultiFrame &multi_frame, ExecutionContext &context) override { + bool PullMultiple(MultiFrame &multi_frame, ExecutionContext &context) override { SCOPED_PROFILE_OP("CreateExpandMF"); - input_cursor_->PullMultiple(multi_frame, context); + if (!input_cursor_->PullMultiple(multi_frame, context)) { + return false; + } auto request_vertices = ExpandCreationInfoToRequests(multi_frame, context); { SCOPED_REQUEST_WAIT_PROFILE; @@ -2528,6 +2928,7 @@ class DistributedCreateExpandCursor : public Cursor { } } } + return true; } void Shutdown() override { input_cursor_->Shutdown(); } @@ -2571,27 +2972,16 @@ class DistributedCreateExpandCursor : public Cursor { const auto &v1 = v1_value.ValueVertex(); const auto &v2 = OtherVertex(frame); - // Set src and dest vertices - // TODO(jbajic) Currently we are only handling scenario where vertices - // are matched - const auto set_vertex = [&context](const auto &vertex, auto &vertex_id) { - vertex_id.first = vertex.PrimaryLabel(); - for (const auto &[key, val] : vertex.Properties()) { - if (context.request_router->IsPrimaryKey(vertex_id.first.id, key)) { - vertex_id.second.push_back(val); - } - } - }; std::invoke([&]() { switch (edge_info.direction) { case EdgeAtom::Direction::IN: { - set_vertex(v2, request.src_vertex); - set_vertex(v1, request.dest_vertex); + request.src_vertex = v2.Id(); + request.dest_vertex = v1.Id(); break; } case EdgeAtom::Direction::OUT: { - set_vertex(v1, request.src_vertex); - set_vertex(v2, request.dest_vertex); + request.src_vertex = v1.Id(); + request.dest_vertex = v2.Id(); break; } case EdgeAtom::Direction::BOTH: @@ -2670,7 +3060,7 @@ class DistributedCreateExpandCursor : public Cursor { class DistributedExpandCursor : public Cursor { public: - explicit DistributedExpandCursor(const Expand &self, utils::MemoryResource *mem) + DistributedExpandCursor(const Expand &self, utils::MemoryResource *mem) : self_(self), input_cursor_(self.input_->MakeCursor(mem)), current_in_edge_it_(current_in_edges_.begin()), @@ -2707,16 +3097,15 @@ class DistributedExpandCursor : public Cursor { throw std::runtime_error("EdgeDirection Both not implemented"); } }; - msgs::ExpandOneRequest request; + + msgs::GetPropertiesRequest request; // to not fetch any properties of the edges - request.edge_properties.emplace(); - request.src_vertices.push_back(get_dst_vertex(edge, direction)); - request.direction = (direction == EdgeAtom::Direction::IN) ? msgs::EdgeDirection::OUT : msgs::EdgeDirection::IN; - auto result_rows = context.request_router->ExpandOne(std::move(request)); + request.vertex_ids.push_back(get_dst_vertex(edge, direction)); + auto result_rows = context.request_router->GetProperties(std::move(request)); MG_ASSERT(result_rows.size() == 1); auto &result_row = result_rows.front(); - frame[self_.common_.node_symbol] = accessors::VertexAccessor( - msgs::Vertex{result_row.src_vertex}, result_row.src_vertex_properties, context.request_router); + frame[self_.common_.node_symbol] = + accessors::VertexAccessor(msgs::Vertex{result_row.vertex}, result_row.props, context.request_router); } bool InitEdges(Frame &frame, ExecutionContext &context) { @@ -2734,6 +3123,9 @@ class DistributedExpandCursor : public Cursor { auto &vertex = vertex_value.ValueVertex(); msgs::ExpandOneRequest request; request.direction = DirectionToMsgsDirection(self_.common_.direction); + std::transform(self_.common_.edge_types.begin(), self_.common_.edge_types.end(), + std::back_inserter(request.edge_types), + [](const storage::v3::EdgeTypeId edge_type_id) { return msgs::EdgeType{edge_type_id}; }); // to not fetch any properties of the edges request.edge_properties.emplace(); request.src_vertices.push_back(vertex.Id()); @@ -2825,19 +3217,248 @@ class DistributedExpandCursor : public Cursor { } } + void InitEdgesMultiple() { + // This function won't work if any vertex id is duplicated in the input, because: + // 1. vertex_id_to_result_row is not a multimap + // 2. if self_.common_.existing_node is true, then we erase edges that might be necessary for the input vertex on a + // later frame + const auto &frame = (*own_frames_it_); + const auto &vertex_value = frame[self_.input_symbol_]; + + if (vertex_value.IsNull()) { + ResetMultiFrameEdgeIts(); + return; + } + + ExpectType(self_.input_symbol_, vertex_value, TypedValue::Type::Vertex); + const auto &vertex = vertex_value.ValueVertex(); + + current_vertex_ = &vertex; + + auto &ref_counted_result_row = vertex_id_to_result_row.at(vertex.Id()); + auto &result_row = *ref_counted_result_row.result_row; + + current_in_edge_mf_it_ = result_row.in_edges_with_specific_properties.begin(); + in_edges_end_it_ = result_row.in_edges_with_specific_properties.end(); + AdvanceUntilSuitableEdge(current_in_edge_mf_it_, in_edges_end_it_); + current_out_edge_mf_it_ = result_row.out_edges_with_specific_properties.begin(); + out_edges_end_it_ = result_row.out_edges_with_specific_properties.end(); + AdvanceUntilSuitableEdge(current_out_edge_mf_it_, out_edges_end_it_); + + if (ref_counted_result_row.ref_count == 1) { + vertex_id_to_result_row.erase(vertex.Id()); + } else { + ref_counted_result_row.ref_count--; + } + } + + bool PullInputFrames(ExecutionContext &context) { + const auto pulled_any = input_cursor_->PullMultiple(*own_multi_frame_, context); + // These needs to be updated regardless of the result of the pull, otherwise the consumer and iterator might + // get corrupted because of the operations done on our MultiFrame. + own_frames_consumer_ = own_multi_frame_->GetValidFramesConsumer(); + own_frames_it_ = own_frames_consumer_->begin(); + if (!pulled_any) { + return false; + } + + vertex_id_to_result_row.clear(); + + msgs::ExpandOneRequest request; + request.direction = DirectionToMsgsDirection(self_.common_.direction); + std::transform(self_.common_.edge_types.begin(), self_.common_.edge_types.end(), + std::back_inserter(request.edge_types), + [](const storage::v3::EdgeTypeId edge_type_id) { return msgs::EdgeType{edge_type_id}; }); + // to not fetch any properties of the edges + request.edge_properties.emplace(); + for (const auto &frame : own_multi_frame_->GetValidFramesReader()) { + const auto &vertex_value = frame[self_.input_symbol_]; + + // Null check due to possible failed optional match. + MG_ASSERT(!vertex_value.IsNull()); + + ExpectType(self_.input_symbol_, vertex_value, TypedValue::Type::Vertex); + const auto &vertex = vertex_value.ValueVertex(); + auto [it, inserted] = vertex_id_to_result_row.try_emplace(vertex.Id(), RefCountedResultRow{1U, nullptr}); + + if (inserted) { + request.src_vertices.push_back(vertex.Id()); + } else { + it->second.ref_count++; + } + } + + result_rows_ = std::invoke([&context, &request]() mutable { + SCOPED_REQUEST_WAIT_PROFILE; + return context.request_router->ExpandOne(std::move(request)); + }); + for (auto &row : result_rows_) { + vertex_id_to_result_row[row.src_vertex.id].result_row = &row; + } + + return true; + } + + bool PullMultiple(MultiFrame &output_multi_frame, ExecutionContext &context) override { + SCOPED_PROFILE_OP("DistributedExpandMF"); + EnsureOwnMultiFrameIsGood(output_multi_frame); + // A helper function for expanding a node from an edge. + + auto output_frames_populator = output_multi_frame.GetInvalidFramesPopulator(); + auto populated_any = false; + + while (true) { + switch (state_) { + case State::PullInputAndEdges: { + if (!PullInputFrames(context)) { + state_ = State::Exhausted; + return populated_any; + } + state_ = State::InitInOutEdgesIt; + break; + } + case State::InitInOutEdgesIt: { + if (own_frames_it_ == own_frames_consumer_->end()) { + state_ = State::PullInputAndEdges; + } else { + InitEdgesMultiple(); + state_ = State::PopulateOutput; + } + break; + } + case State::PopulateOutput: { + if (!output_multi_frame.HasInvalidFrame()) { + return populated_any; + } + if (current_in_edge_mf_it_ == in_edges_end_it_ && current_out_edge_mf_it_ == out_edges_end_it_) { + own_frames_it_->MakeInvalid(); + ++own_frames_it_; + state_ = State::InitInOutEdgesIt; + continue; + } + auto populate_edges = [this, &context, &output_frames_populator, &populated_any]( + const EdgeAtom::Direction direction, EdgesIterator ¤t, + const EdgesIterator &end) { + for (auto output_frame_it = output_frames_populator.begin(); + output_frame_it != output_frames_populator.end() && current != end; ++output_frame_it) { + auto &edge = *current; + auto &output_frame = *output_frame_it; + output_frame = *own_frames_it_; + switch (direction) { + case EdgeAtom::Direction::IN: { + output_frame[self_.common_.edge_symbol] = + EdgeAccessor{msgs::Edge{edge.other_end, current_vertex_->Id(), {}, {edge.gid}, edge.type}, + context.request_router}; + break; + } + case EdgeAtom::Direction::OUT: { + output_frame[self_.common_.edge_symbol] = + EdgeAccessor{msgs::Edge{current_vertex_->Id(), edge.other_end, {}, {edge.gid}, edge.type}, + context.request_router}; + break; + } + case EdgeAtom::Direction::BOTH: { + LOG_FATAL("Must indicate exact expansion direction here"); + } + }; + PullDstVertex(output_frame, context, direction); + ++current; + AdvanceUntilSuitableEdge(current, end); + populated_any = true; + } + }; + populate_edges(EdgeAtom::Direction::IN, current_in_edge_mf_it_, in_edges_end_it_); + populate_edges(EdgeAtom::Direction::OUT, current_out_edge_mf_it_, out_edges_end_it_); + break; + } + case State::Exhausted: { + return populated_any; + } + } + } + return populated_any; + } + + void EnsureOwnMultiFrameIsGood(MultiFrame &output_multi_frame) { + if (!own_multi_frame_.has_value()) { + own_multi_frame_.emplace(MultiFrame(output_multi_frame.GetFirstFrame().elems().size(), + FLAGS_default_multi_frame_size, output_multi_frame.GetMemoryResource())); + own_frames_consumer_.emplace(own_multi_frame_->GetValidFramesConsumer()); + own_frames_it_ = own_frames_consumer_->begin(); + } + MG_ASSERT(output_multi_frame.GetFirstFrame().elems().size() == own_multi_frame_->GetFirstFrame().elems().size()); + } + void Shutdown() override { input_cursor_->Shutdown(); } void Reset() override { input_cursor_->Reset(); + vertex_id_to_result_row.clear(); + result_rows_.clear(); + own_frames_it_ = ValidFramesConsumer::Iterator{}; + own_frames_consumer_.reset(); + own_multi_frame_->MakeAllFramesInvalid(); + state_ = State::PullInputAndEdges; + current_in_edges_.clear(); current_out_edges_.clear(); current_in_edge_it_ = current_in_edges_.end(); current_out_edge_it_ = current_out_edges_.end(); + + ResetMultiFrameEdgeIts(); } private: + enum class State { PullInputAndEdges, InitInOutEdgesIt, PopulateOutput, Exhausted }; + + struct RefCountedResultRow { + size_t ref_count{0U}; + msgs::ExpandOneResultRow *result_row{nullptr}; + }; + + using EdgeWithSpecificProperties = msgs::ExpandOneResultRow::EdgeWithSpecificProperties; + using EdgesVector = std::vector<EdgeWithSpecificProperties>; + using EdgesIterator = EdgesVector::iterator; + + void ResetMultiFrameEdgeIts() { + in_edges_end_it_ = EdgesIterator{}; + current_in_edge_mf_it_ = in_edges_end_it_; + out_edges_end_it_ = EdgesIterator{}; + current_out_edge_mf_it_ = out_edges_end_it_; + } + + void AdvanceUntilSuitableEdge(EdgesIterator ¤t, const EdgesIterator &end) { + if (!self_.common_.existing_node) { + return; + } + + const auto &existing_node_value = (*own_frames_it_)[self_.common_.node_symbol]; + if (existing_node_value.IsNull()) { + current = end; + return; + } + const auto &existing_node = existing_node_value.ValueVertex(); + current = std::find_if(current, end, [&existing_node](const EdgeWithSpecificProperties &edge) { + return edge.other_end == existing_node.Id(); + }); + } + const Expand &self_; const UniqueCursorPtr input_cursor_; + EdgesIterator current_in_edge_mf_it_; + EdgesIterator in_edges_end_it_; + EdgesIterator current_out_edge_mf_it_; + EdgesIterator out_edges_end_it_; + State state_{State::PullInputAndEdges}; + std::optional<MultiFrame> own_multi_frame_; + std::optional<ValidFramesConsumer> own_frames_consumer_; + const VertexAccessor *current_vertex_{nullptr}; + ValidFramesConsumer::Iterator own_frames_it_; + std::vector<msgs::ExpandOneResultRow> result_rows_; + // This won't work if any vertex id is duplicated in the input + std::unordered_map<msgs::VertexId, RefCountedResultRow> vertex_id_to_result_row; + + // TODO(antaljanosbenjamin): Remove when single frame approach is removed std::vector<EdgeAccessor> current_in_edges_; std::vector<EdgeAccessor> current_out_edges_; std::vector<EdgeAccessor>::iterator current_in_edge_it_; diff --git a/src/query/v2/plan/operator.lcp b/src/query/v2/plan/operator.lcp index e2d3eca56..110ba8a33 100644 --- a/src/query/v2/plan/operator.lcp +++ b/src/query/v2/plan/operator.lcp @@ -72,7 +72,21 @@ class Cursor { /// @throws QueryRuntimeException if something went wrong with execution virtual bool Pull(Frame &, ExecutionContext &) = 0; - virtual void PullMultiple(MultiFrame &, ExecutionContext &) { LOG_FATAL("PullMultipleIsNotImplemented"); } + /// Run an iteration of a @c LogicalOperator with MultiFrame. + /// + /// Since operators may be chained, the iteration may pull results from + /// multiple operators. + /// + /// @param MultiFrame May be read from or written to while performing the + /// iteration. + /// @param ExecutionContext Used to get the position of symbols in frame and + /// other information. + /// @return True if the operator was able to populate at least one Frame on the MultiFrame, + /// thus if an operator returns true, that means there is at least one valid Frame in the + /// MultiFrame. + /// + /// @throws QueryRuntimeException if something went wrong with execution + virtual bool PullMultiple(MultiFrame &, ExecutionContext &) {MG_ASSERT(false, "PullMultipleIsNotImplemented"); return false; } /// Resets the Cursor to its initial state. virtual void Reset() = 0; @@ -335,7 +349,7 @@ and false on every following Pull.") class OnceCursor : public Cursor { public: OnceCursor() {} - void PullMultiple(MultiFrame &, ExecutionContext &) override; + bool PullMultiple(MultiFrame &, ExecutionContext &) override; bool Pull(Frame &, ExecutionContext &) override; void Shutdown() override; void Reset() override; @@ -1162,6 +1176,7 @@ a boolean value.") public: FilterCursor(const Filter &, utils::MemoryResource *); bool Pull(Frame &, ExecutionContext &) override; + bool PullMultiple(MultiFrame &, ExecutionContext &) override; void Shutdown() override; void Reset() override; @@ -1213,7 +1228,7 @@ RETURN clause) the Produce's pull succeeds exactly once.") public: ProduceCursor(const Produce &, utils::MemoryResource *); bool Pull(Frame &, ExecutionContext &) override; - void PullMultiple(MultiFrame &, ExecutionContext &) override; + bool PullMultiple(MultiFrame &, ExecutionContext &) override; void Shutdown() override; void Reset() override; @@ -1261,6 +1276,7 @@ Has a flag for using DETACH DELETE when deleting vertices.") public: DeleteCursor(const Delete &, utils::MemoryResource *); bool Pull(Frame &, ExecutionContext &) override; + bool PullMultiple(MultiFrame &, ExecutionContext &) override; void Shutdown() override; void Reset() override; @@ -1554,6 +1570,7 @@ edge lists).") EdgeUniquenessFilterCursor(const EdgeUniquenessFilter &, utils::MemoryResource *); bool Pull(Frame &, ExecutionContext &) override; + bool PullMultiple(MultiFrame &, ExecutionContext &) override; void Shutdown() override; void Reset() override; diff --git a/src/query/v2/plan/rewrite/index_lookup.hpp b/src/query/v2/plan/rewrite/index_lookup.hpp index 17996d952..0b9b9cb97 100644 --- a/src/query/v2/plan/rewrite/index_lookup.hpp +++ b/src/query/v2/plan/rewrite/index_lookup.hpp @@ -597,6 +597,9 @@ class IndexLookupRewriter final : public HierarchicalLogicalOperatorVisitor { [](const auto &schema_elem) { return schema_elem.property_id; }); for (const auto &property_filter : property_filters) { + if (property_filter.property_filter->type_ != PropertyFilter::Type::EQUAL) { + continue; + } const auto &property_id = db_->NameToProperty(property_filter.property_filter->property_.name); if (std::find(schema_properties.begin(), schema_properties.end(), property_id) != schema_properties.end()) { pk_temp.emplace_back(std::make_pair(property_filter.expression, property_filter)); diff --git a/src/query/v2/request_router.hpp b/src/query/v2/request_router.hpp index c175ba413..a8326d900 100644 --- a/src/query/v2/request_router.hpp +++ b/src/query/v2/request_router.hpp @@ -24,14 +24,18 @@ #include <stdexcept> #include <thread> #include <unordered_map> +#include <variant> #include <vector> +#include <boost/uuid/uuid.hpp> + #include "coordinator/coordinator.hpp" #include "coordinator/coordinator_client.hpp" #include "coordinator/coordinator_rsm.hpp" #include "coordinator/shard_map.hpp" #include "io/address.hpp" #include "io/errors.hpp" +#include "io/local_transport/local_transport.hpp" #include "io/notifier.hpp" #include "io/rsm/raft.hpp" #include "io/rsm/rsm_client.hpp" @@ -113,7 +117,10 @@ class RequestRouterInterface { virtual std::optional<storage::v3::EdgeTypeId> MaybeNameToEdgeType(const std::string &name) const = 0; virtual std::optional<storage::v3::LabelId> MaybeNameToLabel(const std::string &name) const = 0; virtual bool IsPrimaryLabel(storage::v3::LabelId label) const = 0; - virtual bool IsPrimaryKey(storage::v3::LabelId primary_label, storage::v3::PropertyId property) const = 0; + virtual bool IsPrimaryProperty(storage::v3::LabelId primary_label, storage::v3::PropertyId property) const = 0; + + virtual std::optional<std::pair<uint64_t, uint64_t>> AllocateInitialEdgeIds(io::Address coordinator_address) = 0; + virtual void InstallSimulatorTicker(std::function<bool()> tick_simulator) = 0; virtual const std::vector<coordinator::SchemaProperty> &GetSchemaForLabel(storage::v3::LabelId label) const = 0; }; @@ -139,7 +146,7 @@ class RequestRouter : public RequestRouterInterface { ~RequestRouter() override {} - void InstallSimulatorTicker(std::function<bool()> tick_simulator) { + void InstallSimulatorTicker(std::function<bool()> tick_simulator) override { notifier_.InstallSimulatorTicker(tick_simulator); } @@ -224,7 +231,7 @@ class RequestRouter : public RequestRouterInterface { return edge_types_.IdToName(id.AsUint()); } - bool IsPrimaryKey(storage::v3::LabelId primary_label, storage::v3::PropertyId property) const override { + bool IsPrimaryProperty(storage::v3::LabelId primary_label, storage::v3::PropertyId property) const override { const auto schema_it = shards_map_.schemas.find(primary_label); MG_ASSERT(schema_it != shards_map_.schemas.end(), "Invalid primary label id: {}", primary_label.AsUint()); @@ -715,6 +722,23 @@ class RequestRouter : public RequestRouterInterface { edge_types_.StoreMapping(std::move(id_to_name)); } + std::optional<std::pair<uint64_t, uint64_t>> AllocateInitialEdgeIds(io::Address coordinator_address) override { + coordinator::CoordinatorWriteRequests requests{coordinator::AllocateEdgeIdBatchRequest{.batch_size = 1000000}}; + + io::rsm::WriteRequest<coordinator::CoordinatorWriteRequests> ww; + ww.operation = requests; + auto resp = + io_.template Request<io::rsm::WriteRequest<coordinator::CoordinatorWriteRequests>, + io::rsm::WriteResponse<coordinator::CoordinatorWriteResponses>>(coordinator_address, ww) + .Wait(); + if (resp.HasValue()) { + const auto alloc_edge_id_reps = + std::get<coordinator::AllocateEdgeIdBatchResponse>(resp.GetValue().message.write_return); + return std::make_pair(alloc_edge_id_reps.low, alloc_edge_id_reps.high); + } + return {}; + } + ShardMap shards_map_; storage::v3::NameIdMapper properties_; storage::v3::NameIdMapper edge_types_; @@ -726,4 +750,66 @@ class RequestRouter : public RequestRouterInterface { io::Notifier notifier_ = {}; // TODO(kostasrim) Add batch prefetching }; + +class RequestRouterFactory { + public: + RequestRouterFactory() = default; + RequestRouterFactory(const RequestRouterFactory &) = delete; + RequestRouterFactory &operator=(const RequestRouterFactory &) = delete; + RequestRouterFactory(RequestRouterFactory &&) = delete; + RequestRouterFactory &operator=(RequestRouterFactory &&) = delete; + + virtual ~RequestRouterFactory() = default; + + virtual std::unique_ptr<RequestRouterInterface> CreateRequestRouter( + const coordinator::Address &coordinator_address) const = 0; +}; + +class LocalRequestRouterFactory : public RequestRouterFactory { + using LocalTransportIo = io::Io<io::local_transport::LocalTransport>; + LocalTransportIo &io_; + + public: + explicit LocalRequestRouterFactory(LocalTransportIo &io) : io_(io) {} + + std::unique_ptr<RequestRouterInterface> CreateRequestRouter( + const coordinator::Address &coordinator_address) const override { + using TransportType = io::local_transport::LocalTransport; + + auto query_io = io_.ForkLocal(boost::uuids::uuid{boost::uuids::random_generator()()}); + auto local_transport_io = io_.ForkLocal(boost::uuids::uuid{boost::uuids::random_generator()()}); + + return std::make_unique<RequestRouter<TransportType>>( + coordinator::CoordinatorClient<TransportType>(query_io, coordinator_address, {coordinator_address}), + std::move(local_transport_io)); + } +}; + +class SimulatedRequestRouterFactory : public RequestRouterFactory { + io::simulator::Simulator *simulator_; + + public: + explicit SimulatedRequestRouterFactory(io::simulator::Simulator &simulator) : simulator_(&simulator) {} + + std::unique_ptr<RequestRouterInterface> CreateRequestRouter( + const coordinator::Address &coordinator_address) const override { + using TransportType = io::simulator::SimulatorTransport; + auto actual_transport_handle = simulator_->GetSimulatorHandle(); + + boost::uuids::uuid random_uuid; + io::Address unique_local_addr_query; + + // The simulated RR should not introduce stochastic behavior. + random_uuid = boost::uuids::uuid{3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; + unique_local_addr_query = {.unique_id = boost::uuids::uuid{4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}}; + + auto io = simulator_->Register(unique_local_addr_query); + auto query_io = io.ForkLocal(random_uuid); + + return std::make_unique<RequestRouter<TransportType>>( + coordinator::CoordinatorClient<TransportType>(query_io, coordinator_address, {coordinator_address}), + std::move(io)); + } +}; + } // namespace memgraph::query::v2 diff --git a/src/query/v2/requests.hpp b/src/query/v2/requests.hpp index 9a0fca2ce..f27094c02 100644 --- a/src/query/v2/requests.hpp +++ b/src/query/v2/requests.hpp @@ -26,6 +26,7 @@ #include "storage/v3/id_types.hpp" #include "storage/v3/property_value.hpp" #include "storage/v3/result.hpp" +#include "utils/fnv.hpp" namespace memgraph::msgs { @@ -590,3 +591,48 @@ using WriteResponses = std::variant<CreateVerticesResponse, DeleteVerticesRespon CreateExpandResponse, DeleteEdgesResponse, UpdateEdgesResponse, CommitResponse>; } // namespace memgraph::msgs + +namespace std { + +template <> +struct hash<memgraph::msgs::Value>; + +template <> +struct hash<memgraph::msgs::VertexId> { + size_t operator()(const memgraph::msgs::VertexId &id) const { + using LabelId = memgraph::storage::v3::LabelId; + using Value = memgraph::msgs::Value; + return memgraph::utils::HashCombine<LabelId, std::vector<Value>, std::hash<LabelId>, + memgraph::utils::FnvCollection<std::vector<Value>, Value>>{}(id.first.id, + id.second); + } +}; + +template <> +struct hash<memgraph::msgs::Value> { + size_t operator()(const memgraph::msgs::Value &value) const { + using Type = memgraph::msgs::Value::Type; + switch (value.type) { + case Type::Null: + return std::hash<size_t>{}(0U); + case Type::Bool: + return std::hash<bool>{}(value.bool_v); + case Type::Int64: + return std::hash<int64_t>{}(value.int_v); + case Type::Double: + return std::hash<double>{}(value.double_v); + case Type::String: + return std::hash<std::string>{}(value.string_v); + case Type::List: + LOG_FATAL("Add hash for lists"); + case Type::Map: + LOG_FATAL("Add hash for maps"); + case Type::Vertex: + LOG_FATAL("Add hash for vertices"); + case Type::Edge: + LOG_FATAL("Add hash for edges"); + } + } +}; + +} // namespace std diff --git a/src/storage/v3/request_helper.cpp b/src/storage/v3/request_helper.cpp index 6b889fe16..f13c5a82e 100644 --- a/src/storage/v3/request_helper.cpp +++ b/src/storage/v3/request_helper.cpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -321,12 +321,15 @@ EdgeFiller InitializeEdgeFillerFunction(const msgs::ExpandOneRequest &req) { value_properties.insert(std::make_pair(prop_key, FromPropertyValueToValue(std::move(prop_val)))); } using EdgeWithAllProperties = msgs::ExpandOneResultRow::EdgeWithAllProperties; - EdgeWithAllProperties edges{ToMsgsVertexId(edge.From()), msgs::EdgeType{edge.EdgeType()}, edge.Gid().AsUint(), - std::move(value_properties)}; + if (is_in_edge) { - result_row.in_edges_with_all_properties.push_back(std::move(edges)); + result_row.in_edges_with_all_properties.push_back( + EdgeWithAllProperties{ToMsgsVertexId(edge.From()), msgs::EdgeType{edge.EdgeType()}, edge.Gid().AsUint(), + std::move(value_properties)}); } else { - result_row.out_edges_with_all_properties.push_back(std::move(edges)); + result_row.out_edges_with_all_properties.push_back( + EdgeWithAllProperties{ToMsgsVertexId(edge.To()), msgs::EdgeType{edge.EdgeType()}, edge.Gid().AsUint(), + std::move(value_properties)}); } return {}; }; @@ -346,12 +349,15 @@ EdgeFiller InitializeEdgeFillerFunction(const msgs::ExpandOneRequest &req) { value_properties.emplace_back(FromPropertyValueToValue(std::move(property_result.GetValue()))); } using EdgeWithSpecificProperties = msgs::ExpandOneResultRow::EdgeWithSpecificProperties; - EdgeWithSpecificProperties edges{ToMsgsVertexId(edge.From()), msgs::EdgeType{edge.EdgeType()}, - edge.Gid().AsUint(), std::move(value_properties)}; + if (is_in_edge) { - result_row.in_edges_with_specific_properties.push_back(std::move(edges)); + result_row.in_edges_with_specific_properties.push_back( + EdgeWithSpecificProperties{ToMsgsVertexId(edge.From()), msgs::EdgeType{edge.EdgeType()}, + edge.Gid().AsUint(), std::move(value_properties)}); } else { - result_row.out_edges_with_specific_properties.push_back(std::move(edges)); + result_row.out_edges_with_specific_properties.push_back( + EdgeWithSpecificProperties{ToMsgsVertexId(edge.To()), msgs::EdgeType{edge.EdgeType()}, edge.Gid().AsUint(), + std::move(value_properties)}); } return {}; }; diff --git a/tests/mgbench/compare_results.py b/tests/mgbench/compare_results.py index 2179bb408..46da74270 100755 --- a/tests/mgbench/compare_results.py +++ b/tests/mgbench/compare_results.py @@ -14,7 +14,6 @@ import argparse import json - FIELDS = [ { "name": "throughput", @@ -85,39 +84,32 @@ def compare_results(results_from, results_to, fields): if group == "__import__": continue for scenario, summary_to in scenarios.items(): - summary_from = recursive_get( - results_from, dataset, variant, group, scenario, - value={}) - if len(summary_from) > 0 and \ - summary_to["count"] != summary_from["count"] or \ - summary_to["num_workers"] != \ - summary_from["num_workers"]: + summary_from = recursive_get(results_from, dataset, variant, group, scenario, value={}) + if ( + len(summary_from) > 0 + and summary_to["count"] != summary_from["count"] + or summary_to["num_workers"] != summary_from["num_workers"] + ): raise Exception("Incompatible results!") - testcode = "/".join([dataset, variant, group, scenario, - "{:02d}".format( - summary_to["num_workers"])]) + testcode = "/".join([dataset, variant, group, scenario, "{:02d}".format(summary_to["num_workers"])]) row = {} performance_changed = False for field in fields: key = field["name"] if key in summary_to: - row[key] = compute_diff( - summary_from.get(key, None), - summary_to[key]) + row[key] = compute_diff(summary_from.get(key, None), summary_to[key]) elif key in summary_to["database"]: row[key] = compute_diff( - recursive_get(summary_from, "database", key, - value=None), - summary_to["database"][key]) + recursive_get(summary_from, "database", key, value=None), summary_to["database"][key] + ) else: row[key] = compute_diff( - recursive_get(summary_from, "metadata", key, - "average", value=None), - summary_to["metadata"][key]["average"]) - if "diff" not in row[key] or \ - ("diff_treshold" in field and - abs(row[key]["diff"]) >= - field["diff_treshold"]): + recursive_get(summary_from, "metadata", key, "average", value=None), + summary_to["metadata"][key]["average"], + ) + if "diff" not in row[key] or ( + "diff_treshold" in field and abs(row[key]["diff"]) >= field["diff_treshold"] + ): performance_changed = True if performance_changed: ret[testcode] = row @@ -130,29 +122,36 @@ def generate_remarkup(fields, data): ret += "<table>\n" ret += " <tr>\n" ret += " <th>Testcode</th>\n" - ret += "\n".join(map(lambda x: " <th>{}</th>".format( - x["name"].replace("_", " ").capitalize()), fields)) + "\n" + ret += ( + "\n".join( + map( + lambda x: " <th>{}</th>".format(x["name"].replace("_", " ").capitalize()), + fields, + ) + ) + + "\n" + ) ret += " </tr>\n" for testcode in sorted(data.keys()): ret += " <tr>\n" ret += " <td>{}</td>\n".format(testcode) for field in fields: - result = data[testcode][field["name"]] - value = result["value"] * field["scaling"] - if "diff" in result: - diff = result["diff"] - arrow = "arrow-up" if diff >= 0 else "arrow-down" - if not (field["positive_diff_better"] ^ (diff >= 0)): - color = "green" + result = data[testcode].get(field["name"]) + if result != None: + value = result["value"] * field["scaling"] + if "diff" in result: + diff = result["diff"] + arrow = "arrow-up" if diff >= 0 else "arrow-down" + if not (field["positive_diff_better"] ^ (diff >= 0)): + color = "green" + else: + color = "red" + sign = "{{icon {} color={}}}".format(arrow, color) + ret += ' <td bgcolor="{}">{:.3f}{} ({:+.2%})</td>\n'.format( + color, value, field["unit"], diff + ) else: - color = "red" - sign = "{{icon {} color={}}}".format(arrow, color) - ret += " <td>{:.3f}{} //({:+.2%})// {}</td>\n".format( - value, field["unit"], diff, sign) - else: - ret += " <td>{:.3f}{} //(new)// " \ - "{{icon plus color=blue}}</td>\n".format( - value, field["unit"]) + ret += '<td bgcolor="blue">{:.3f}{} //(new)// </td>\n'.format(value, field["unit"]) ret += " </tr>\n" ret += "</table>\n" else: @@ -161,11 +160,14 @@ def generate_remarkup(fields, data): if __name__ == "__main__": - parser = argparse.ArgumentParser( - description="Compare results of multiple benchmark runs.") - parser.add_argument("--compare", action="append", nargs=2, - metavar=("from", "to"), - help="compare results between `from` and `to` files") + parser = argparse.ArgumentParser(description="Compare results of multiple benchmark runs.") + parser.add_argument( + "--compare", + action="append", + nargs=2, + metavar=("from", "to"), + help="compare results between `from` and `to` files", + ) parser.add_argument("--output", default="", help="output file name") args = parser.parse_args() diff --git a/tests/mgbench/dataset_creator_unwind.py b/tests/mgbench/dataset_creator_unwind.py new file mode 100644 index 000000000..564a4d018 --- /dev/null +++ b/tests/mgbench/dataset_creator_unwind.py @@ -0,0 +1,139 @@ +# Copyright 2022 Memgraph Ltd. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +# License, and you may not use this file except in compliance with the Business Source License. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0, included in the file +# licenses/APL.txt. + +import argparse +import random + +import helpers + +# Explaination of datasets: +# - empty_only_index: contains index; contains no data +# - small: contains index; contains data (small dataset) +# +# Datamodel is as follow: +# +# ┌──────────────┐ +# │ Permission │ +# ┌────────────────┐ │ Schema:uuid │ ┌────────────┐ +# │:IS_FOR_IDENTITY├────┤ Index:name ├───┤:IS_FOR_FILE│ +# └┬───────────────┘ └──────────────┘ └────────────┤ +# │ │ +# ┌──────▼──────────────┐ ┌──▼────────────────┐ +# │ Identity │ │ File │ +# │ Schema:uuid │ │ Schema:uuid │ +# │ Index:email │ │ Index:name │ +# └─────────────────────┘ │ Index:platformId │ +# └───────────────────┘ +# +# - File: attributes: ["uuid", "name", "platformId"] +# - Permission: attributes: ["uuid", "name"] +# - Identity: attributes: ["uuid", "email"] +# +# Indexes: +# - File: [File(uuid), File(platformId), File(name)] +# - Permission: [Permission(uuid), Permission(name)] +# - Identity: [Identity(uuid), Identity(email)] +# +# Edges: +# - (:Permission)-[:IS_FOR_FILE]->(:File) +# - (:Permission)-[:IS_FOR_IDENTITYR]->(:Identity) +# +# AccessControl specific: uuid is the schema + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--number_of_identities", type=int, default=10) + parser.add_argument("--number_of_files", type=int, default=10) + parser.add_argument("--percentage_of_permissions", type=float, default=1.0) + parser.add_argument("--filename", default="dataset.cypher") + + args = parser.parse_args() + + number_of_identities = args.number_of_identities + number_of_files = args.number_of_files + percentage_of_permissions = args.percentage_of_permissions + filename = args.filename + + assert number_of_identities >= 0 + assert number_of_files >= 0 + assert percentage_of_permissions > 0.0 and percentage_of_permissions <= 1.0 + assert filename != "" + + with open(filename, "w") as f: + f.write("MATCH (n) DETACH DELETE n;\n") + + # Create the indexes + f.write("CREATE INDEX ON :File;\n") + f.write("CREATE INDEX ON :Permission;\n") + f.write("CREATE INDEX ON :Identity;\n") + f.write("CREATE INDEX ON :File(platformId);\n") + f.write("CREATE INDEX ON :File(name);\n") + f.write("CREATE INDEX ON :Permission(name);\n") + f.write("CREATE INDEX ON :Identity(email);\n") + + # Create extra index: in distributed, this will be the schema + f.write("CREATE INDEX ON :File(uuid);\n") + f.write("CREATE INDEX ON :Permission(uuid);\n") + f.write("CREATE INDEX ON :Identity(uuid);\n") + + uuid = 1 + + # Create the nodes File + f.write("UNWIND [") + for index in range(0, number_of_files): + if index != 0: + f.write(",") + f.write(f' {{uuid: {uuid}, platformId: "platform_id", name: "name_file_{uuid}"}}') + uuid += 1 + f.write("] AS props CREATE (:File {uuid: props.uuid, platformId: props.platformId, name: props.name});\n") + + identities = [] + f.write("UNWIND [") + # Create the nodes Identity + for index in range(0, number_of_identities): + if index != 0: + f.write(",") + f.write(f' {{uuid: {uuid}, name: "mail_{uuid}@something.com"}}') + uuid += 1 + f.write("] AS props CREATE (:Identity {uuid: props.uuid, name: props.name});\n") + + f.write("UNWIND [") + created = 0 + for outer_index in range(0, number_of_files): + for inner_index in range(0, number_of_identities): + + file_uuid = outer_index + 1 + identity_uuid = number_of_files + inner_index + 1 + + if random.random() <= percentage_of_permissions: + + if created > 0: + f.write(",") + + f.write( + f' {{permUuid: {uuid}, permName: "name_permission_{uuid}", fileUuid: {file_uuid}, identityUuid: {identity_uuid}}}' + ) + created += 1 + uuid += 1 + + if created == 5000: + f.write( + "] AS props MATCH (file:File {uuid:props.fileUuid}), (identity:Identity {uuid: props.identityUuid}) CREATE (permission:Permission {uuid: props.permUuid, name: props.permName}) CREATE (permission)-[: IS_FOR_FILE]->(file) CREATE (permission)-[: IS_FOR_IDENTITY]->(identity);\nUNWIND [" + ) + created = 0 + f.write( + "] AS props MATCH (file:File {uuid:props.fileUuid}), (identity:Identity {uuid: props.identityUuid}) CREATE (permission:Permission {uuid: props.permUuid, name: props.permName}) CREATE (permission)-[: IS_FOR_FILE]->(file) CREATE (permission)-[: IS_FOR_IDENTITY]->(identity);\n" + ) + + +if __name__ == "__main__": + main() diff --git a/tests/mgbench/datasets.py b/tests/mgbench/datasets.py index 3a5806629..319b67c17 100644 --- a/tests/mgbench/datasets.py +++ b/tests/mgbench/datasets.py @@ -353,7 +353,7 @@ class AccessControl(Dataset): def benchmark__create__vertex(self): self.next_value_idx += 1 - query = (f"CREATE (:File {{uuid: {self.next_value_idx}}});", {}) + query = ("CREATE (:File {uuid: $uuid})", {"uuid": self.next_value_idx}) return query def benchmark__create__edges(self): @@ -379,6 +379,24 @@ class AccessControl(Dataset): return query def benchmark__match__match_all_vertices_with_edges(self): - self.next_value_idx += 1 query = ("MATCH (permission:Permission)-[e:IS_FOR_FILE]->(file:File) RETURN *", {}) return query + + def benchmark__match__match_users_with_permission_for_files(self): + file_uuid_1 = self._get_random_uuid("File") + file_uuid_2 = self._get_random_uuid("File") + min_file_uuid = min(file_uuid_1, file_uuid_2) + max_file_uuid = max(file_uuid_1, file_uuid_2) + query = ( + "MATCH (f:File)<-[ff:IS_FOR_FILE]-(p:Permission)-[fi:IS_FOR_IDENTITY]->(i:Identity) WHERE f.uuid >= $min_file_uuid AND f.uuid <= $max_file_uuid RETURN *", + {"min_file_uuid": min_file_uuid, "max_file_uuid": max_file_uuid}, + ) + return query + + def benchmark__match__match_users_with_permission_for_specific_file(self): + file_uuid = self._get_random_uuid("File") + query = ( + "MATCH (f:File {uuid: $file_uuid})<-[ff:IS_FOR_FILE]-(p:Permission)-[fi:IS_FOR_IDENTITY]->(i:Identity) RETURN *", + {"file_uuid": file_uuid}, + ) + return query diff --git a/tests/mgbench/runners.py b/tests/mgbench/runners.py index 2b69a811f..cf89d7f67 100644 --- a/tests/mgbench/runners.py +++ b/tests/mgbench/runners.py @@ -68,6 +68,15 @@ class Memgraph: self._cleanup() atexit.unregister(self._cleanup) + # Returns None if string_value is not true or false, casing doesn't matter + def _get_bool_value(self, string_value): + lower_string_value = string_value.lower() + if lower_string_value == "true": + return True + if lower_string_value == "false": + return False + return None + def _get_args(self, **kwargs): data_directory = os.path.join(self._directory.name, "memgraph") if self._memgraph_version >= (0, 50, 0): @@ -83,7 +92,13 @@ class Memgraph: args_list = self._extra_args.split(" ") assert len(args_list) % 2 == 0 for i in range(0, len(args_list), 2): - kwargs[args_list[i]] = args_list[i + 1] + key = args_list[i] + value = args_list[i + 1] + maybe_bool_value = self._get_bool_value(value) + if maybe_bool_value is not None: + kwargs[key] = maybe_bool_value + else: + kwargs[key] = value return _convert_args_to_flags(self._memgraph_binary, **kwargs) diff --git a/tests/mgbench/splitfiles/accesscontrol_large.shard_configuration b/tests/mgbench/splitfiles/accesscontrol_large.shard_configuration index 34dca66be..d2138ec93 100644 --- a/tests/mgbench/splitfiles/accesscontrol_large.shard_configuration +++ b/tests/mgbench/splitfiles/accesscontrol_large.shard_configuration @@ -1,8 +1,12 @@ -4 +8 uuid email name platformId +permUuid +permName +fileUuid +identityUuid 2 IS_FOR_IDENTITY IS_FOR_FILE diff --git a/tests/mgbench/splitfiles/accesscontrol_medium.shard_configuration b/tests/mgbench/splitfiles/accesscontrol_medium.shard_configuration index a807e783f..f05ee8993 100644 --- a/tests/mgbench/splitfiles/accesscontrol_medium.shard_configuration +++ b/tests/mgbench/splitfiles/accesscontrol_medium.shard_configuration @@ -1,8 +1,12 @@ -4 +8 uuid email name platformId +permUuid +permName +fileUuid +identityUuid 2 IS_FOR_IDENTITY IS_FOR_FILE diff --git a/tests/mgbench/splitfiles/accesscontrol_small.shard_configuration b/tests/mgbench/splitfiles/accesscontrol_small.shard_configuration index 9c11b6258..2cce1ccef 100644 --- a/tests/mgbench/splitfiles/accesscontrol_small.shard_configuration +++ b/tests/mgbench/splitfiles/accesscontrol_small.shard_configuration @@ -1,8 +1,12 @@ -4 +8 uuid email name platformId +permUuid +permName +fileUuid +identityUuid 2 IS_FOR_IDENTITY IS_FOR_FILE diff --git a/tests/simulation/CMakeLists.txt b/tests/simulation/CMakeLists.txt index cd5fc0a4a..f3d4870f0 100644 --- a/tests/simulation/CMakeLists.txt +++ b/tests/simulation/CMakeLists.txt @@ -17,7 +17,7 @@ function(add_simulation_test test_cpp) # requires unique logical target names set_target_properties(${target_name} PROPERTIES OUTPUT_NAME ${exec_name}) - target_link_libraries(${target_name} mg-storage-v3 mg-communication mg-utils mg-io mg-io-simulator mg-coordinator mg-query-v2) + target_link_libraries(${target_name} mg-communication mg-utils mg-io mg-io-simulator mg-coordinator mg-query-v2 mg-storage-v3) target_link_libraries(${target_name} Boost::headers) target_link_libraries(${target_name} gtest gtest_main gmock rapidcheck rapidcheck_gtest) @@ -32,4 +32,5 @@ add_simulation_test(trial_query_storage/query_storage_test.cpp) add_simulation_test(sharded_map.cpp) add_simulation_test(shard_rsm.cpp) add_simulation_test(cluster_property_test.cpp) +add_simulation_test(cluster_property_test_cypher_queries.cpp) add_simulation_test(request_router.cpp) diff --git a/tests/simulation/cluster_property_test_cypher_queries.cpp b/tests/simulation/cluster_property_test_cypher_queries.cpp new file mode 100644 index 000000000..e35edc033 --- /dev/null +++ b/tests/simulation/cluster_property_test_cypher_queries.cpp @@ -0,0 +1,64 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +// This test serves as an example of a property-based model test. +// It generates a cluster configuration and a set of operations to +// apply against both the real system and a greatly simplified model. + +#include <chrono> + +#include <gtest/gtest.h> +#include <rapidcheck.h> +#include <rapidcheck/gtest.h> +#include <spdlog/cfg/env.h> + +#include "generated_operations.hpp" +#include "io/simulator/simulator_config.hpp" +#include "io/time.hpp" +#include "storage/v3/shard_manager.hpp" +#include "test_cluster.hpp" + +namespace memgraph::tests::simulation { + +using io::Duration; +using io::Time; +using io::simulator::SimulatorConfig; +using storage::v3::kMaximumCronInterval; + +RC_GTEST_PROP(RandomClusterConfig, HappyPath, (ClusterConfig cluster_config, NonEmptyOpVec ops, uint64_t rng_seed)) { + spdlog::cfg::load_env_levels(); + + SimulatorConfig sim_config{ + .drop_percent = 0, + .perform_timeouts = false, + .scramble_messages = true, + .rng_seed = rng_seed, + .start_time = Time::min(), + .abort_time = Time::max(), + }; + + std::vector<std::string> queries = {"CREATE (n:test_label{property_1: 0, property_2: 0});", "MATCH (n) RETURN n;"}; + + auto [sim_stats_1, latency_stats_1] = RunClusterSimulationWithQueries(sim_config, cluster_config, queries); + auto [sim_stats_2, latency_stats_2] = RunClusterSimulationWithQueries(sim_config, cluster_config, queries); + + if (latency_stats_1 != latency_stats_2) { + spdlog::error("simulator stats diverged across runs"); + spdlog::error("run 1 simulator stats: {}", sim_stats_1); + spdlog::error("run 2 simulator stats: {}", sim_stats_2); + spdlog::error("run 1 latency:\n{}", latency_stats_1.SummaryTable()); + spdlog::error("run 2 latency:\n{}", latency_stats_2.SummaryTable()); + RC_ASSERT(latency_stats_1 == latency_stats_2); + RC_ASSERT(sim_stats_1 == sim_stats_2); + } +} + +} // namespace memgraph::tests::simulation diff --git a/tests/simulation/simulation_interpreter.hpp b/tests/simulation/simulation_interpreter.hpp new file mode 100644 index 000000000..e83980787 --- /dev/null +++ b/tests/simulation/simulation_interpreter.hpp @@ -0,0 +1,93 @@ +// Copyright 2023 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include "io/simulator/simulator_handle.hpp" +#include "machine_manager/machine_config.hpp" +#include "machine_manager/machine_manager.hpp" +#include "query/v2/config.hpp" +#include "query/v2/discard_value_stream.hpp" +#include "query/v2/frontend/ast/ast.hpp" +#include "query/v2/interpreter.hpp" +#include "query/v2/request_router.hpp" + +#include <string> +#include <vector> + +// TODO(gvolfing) +// -How to set up the entire raft cluster with the QE. Also provide abrstraction for that. +// -Pass an argument to the setup to determine, how many times the retry of a query should happen. + +namespace memgraph::io::simulator { + +class SimulatedInterpreter { + using ResultStream = query::v2::DiscardValueResultStream; + + public: + explicit SimulatedInterpreter(std::unique_ptr<query::v2::InterpreterContext> interpreter_context) + : interpreter_context_(std::move(interpreter_context)) { + interpreter_ = std::make_unique<memgraph::query::v2::Interpreter>(interpreter_context_.get()); + } + + SimulatedInterpreter(const SimulatedInterpreter &) = delete; + SimulatedInterpreter &operator=(const SimulatedInterpreter &) = delete; + SimulatedInterpreter(SimulatedInterpreter &&) = delete; + SimulatedInterpreter &operator=(SimulatedInterpreter &&) = delete; + ~SimulatedInterpreter() = default; + + void InstallSimulatorTicker(Simulator &simulator) { + interpreter_->InstallSimulatorTicker(simulator.GetSimulatorTickClosure()); + } + + std::vector<ResultStream> RunQueries(const std::vector<std::string> &queries) { + std::vector<ResultStream> results; + results.reserve(queries.size()); + + for (const auto &query : queries) { + results.emplace_back(RunQuery(query)); + } + return results; + } + + private: + ResultStream RunQuery(const std::string &query) { + ResultStream stream; + + std::map<std::string, memgraph::storage::v3::PropertyValue> params; + const std::string *username = nullptr; + + interpreter_->Prepare(query, params, username); + interpreter_->PullAll(&stream); + + return stream; + } + + std::unique_ptr<query::v2::InterpreterContext> interpreter_context_; + std::unique_ptr<query::v2::Interpreter> interpreter_; +}; + +SimulatedInterpreter SetUpInterpreter(Address coordinator_address, Simulator &simulator) { + auto rr_factory = std::make_unique<memgraph::query::v2::SimulatedRequestRouterFactory>(simulator); + + auto interpreter_context = std::make_unique<memgraph::query::v2::InterpreterContext>( + nullptr, + memgraph::query::v2::InterpreterConfig{.query = {.allow_load_csv = true}, + .execution_timeout_sec = 600, + .replication_replica_check_frequency = std::chrono::seconds(1), + .default_kafka_bootstrap_servers = "", + .default_pulsar_service_url = "", + .stream_transaction_conflict_retries = 30, + .stream_transaction_retry_interval = std::chrono::milliseconds(500)}, + std::filesystem::path("mg_data"), std::move(rr_factory), coordinator_address); + + return SimulatedInterpreter(std::move(interpreter_context)); +} + +} // namespace memgraph::io::simulator diff --git a/tests/simulation/test_cluster.hpp b/tests/simulation/test_cluster.hpp index 791b45faa..f10e88e61 100644 --- a/tests/simulation/test_cluster.hpp +++ b/tests/simulation/test_cluster.hpp @@ -36,6 +36,8 @@ #include "utils/print_helpers.hpp" #include "utils/variant_helpers.hpp" +#include "simulation_interpreter.hpp" + namespace memgraph::tests::simulation { using coordinator::Coordinator; @@ -279,4 +281,65 @@ std::pair<SimulatorStats, LatencyHistogramSummaries> RunClusterSimulation(const return std::make_pair(stats, histo); } +std::pair<SimulatorStats, LatencyHistogramSummaries> RunClusterSimulationWithQueries( + const SimulatorConfig &sim_config, const ClusterConfig &cluster_config, const std::vector<std::string> &queries) { + spdlog::info("========================== NEW SIMULATION =========================="); + + auto simulator = Simulator(sim_config); + + auto machine_1_addr = Address::TestAddress(1); + auto cli_addr = Address::TestAddress(2); + auto cli_addr_2 = Address::TestAddress(3); + + Io<SimulatorTransport> cli_io = simulator.Register(cli_addr); + Io<SimulatorTransport> cli_io_2 = simulator.Register(cli_addr_2); + + auto coordinator_addresses = std::vector{ + machine_1_addr, + }; + + ShardMap initialization_sm = TestShardMap(cluster_config.shards - 1, cluster_config.replication_factor); + + auto mm_1 = MkMm(simulator, coordinator_addresses, machine_1_addr, initialization_sm); + Address coordinator_address = mm_1.CoordinatorAddress(); + + auto mm_thread_1 = std::jthread(RunMachine, std::move(mm_1)); + simulator.IncrementServerCountAndWaitForQuiescentState(machine_1_addr); + + auto detach_on_error = DetachIfDropped{.handle = mm_thread_1}; + + // TODO(tyler) clarify addresses of coordinator etc... as it's a mess + + CoordinatorClient<SimulatorTransport> coordinator_client(cli_io, coordinator_address, {coordinator_address}); + WaitForShardsToInitialize(coordinator_client); + + auto simulated_interpreter = io::simulator::SetUpInterpreter(coordinator_address, simulator); + simulated_interpreter.InstallSimulatorTicker(simulator); + + auto query_results = simulated_interpreter.RunQueries(queries); + + // We have now completed our workload without failing any assertions, so we can + // disable detaching the worker thread, which will cause the mm_thread_1 jthread + // to be joined when this function returns. + detach_on_error.detach = false; + + simulator.ShutDown(); + + mm_thread_1.join(); + + SimulatorStats stats = simulator.Stats(); + + spdlog::info("total messages: {}", stats.total_messages); + spdlog::info("dropped messages: {}", stats.dropped_messages); + spdlog::info("timed out requests: {}", stats.timed_out_requests); + spdlog::info("total requests: {}", stats.total_requests); + spdlog::info("total responses: {}", stats.total_responses); + spdlog::info("simulator ticks: {}", stats.simulator_ticks); + + auto histo = cli_io_2.ResponseLatencies(); + + spdlog::info("========================== SUCCESS :) =========================="); + return std::make_pair(stats, histo); +} + } // namespace memgraph::tests::simulation diff --git a/tests/unit/mock_helpers.hpp b/tests/unit/mock_helpers.hpp index b7d764ac8..5ce73538a 100644 --- a/tests/unit/mock_helpers.hpp +++ b/tests/unit/mock_helpers.hpp @@ -41,7 +41,9 @@ class MockedRequestRouter : public RequestRouterInterface { MOCK_METHOD(std::optional<storage::v3::EdgeTypeId>, MaybeNameToEdgeType, (const std::string &), (const)); MOCK_METHOD(std::optional<storage::v3::LabelId>, MaybeNameToLabel, (const std::string &), (const)); MOCK_METHOD(bool, IsPrimaryLabel, (storage::v3::LabelId), (const)); - MOCK_METHOD(bool, IsPrimaryKey, (storage::v3::LabelId, storage::v3::PropertyId), (const)); + MOCK_METHOD(bool, IsPrimaryProperty, (storage::v3::LabelId, storage::v3::PropertyId), (const)); + MOCK_METHOD((std::optional<std::pair<uint64_t, uint64_t>>), AllocateInitialEdgeIds, (io::Address)); + MOCK_METHOD(void, InstallSimulatorTicker, (std::function<bool()>)); MOCK_METHOD(const std::vector<coordinator::SchemaProperty> &, GetSchemaForLabel, (storage::v3::LabelId), (const)); }; @@ -59,7 +61,7 @@ class MockedLogicalOperator : public plan::LogicalOperator { class MockedCursor : public plan::Cursor { public: MOCK_METHOD(bool, Pull, (Frame &, expr::ExecutionContext &)); - MOCK_METHOD(void, PullMultiple, (MultiFrame &, expr::ExecutionContext &)); + MOCK_METHOD(bool, PullMultiple, (MultiFrame &, expr::ExecutionContext &)); MOCK_METHOD(void, Reset, ()); MOCK_METHOD(void, Shutdown, ()); }; diff --git a/tests/unit/query_v2_common.hpp b/tests/unit/query_v2_common.hpp index 85905cb22..1b9d6807c 100644 --- a/tests/unit/query_v2_common.hpp +++ b/tests/unit/query_v2_common.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source diff --git a/tests/unit/query_v2_create_node_multiframe.cpp b/tests/unit/query_v2_create_node_multiframe.cpp index b298d2781..a2d5b161d 100644 --- a/tests/unit/query_v2_create_node_multiframe.cpp +++ b/tests/unit/query_v2_create_node_multiframe.cpp @@ -58,7 +58,7 @@ TEST(CreateNodeTest, CreateNodeCursor) { MockedRequestRouter router; EXPECT_CALL(router, CreateVertices(_)).Times(1).WillOnce(Return(std::vector<msgs::CreateVerticesResponse>{})); EXPECT_CALL(router, IsPrimaryLabel(_)).WillRepeatedly(Return(true)); - EXPECT_CALL(router, IsPrimaryKey(_, _)).WillRepeatedly(Return(true)); + EXPECT_CALL(router, IsPrimaryProperty(_, _)).WillRepeatedly(Return(true)); auto context = MakeContext(ast, symbol_table, &router, &id_alloc); auto multi_frame = CreateMultiFrame(context.symbol_table.max_position()); cursor->PullMultiple(multi_frame, context); diff --git a/tests/unit/query_v2_expression_evaluator.cpp b/tests/unit/query_v2_expression_evaluator.cpp index b941d3463..6b1c23816 100644 --- a/tests/unit/query_v2_expression_evaluator.cpp +++ b/tests/unit/query_v2_expression_evaluator.cpp @@ -123,8 +123,13 @@ class MockedRequestRouter : public RequestRouterInterface { bool IsPrimaryLabel(LabelId label) const override { return true; } - bool IsPrimaryKey(LabelId primary_label, PropertyId property) const override { return true; } + bool IsPrimaryProperty(LabelId primary_label, PropertyId property) const override { return true; } + std::optional<std::pair<uint64_t, uint64_t>> AllocateInitialEdgeIds(io::Address coordinator_address) override { + return {}; + } + + void InstallSimulatorTicker(std::function<bool()> tick_simulator) override {} const std::vector<coordinator::SchemaProperty> &GetSchemaForLabel(storage::v3::LabelId /*label*/) const override { static std::vector<coordinator::SchemaProperty> schema; return schema; diff --git a/tests/unit/query_v2_plan.cpp b/tests/unit/query_v2_plan.cpp index 8fc5e11c7..c782f9127 100644 --- a/tests/unit/query_v2_plan.cpp +++ b/tests/unit/query_v2_plan.cpp @@ -86,7 +86,7 @@ class TestPlanner : public ::testing::Test {}; using PlannerTypes = ::testing::Types<Planner>; -TYPED_TEST_CASE(TestPlanner, PlannerTypes); +TYPED_TEST_SUITE(TestPlanner, PlannerTypes); TYPED_TEST(TestPlanner, MatchFilterPropIsNotNull) { const char *prim_label_name = "prim_label_one";