diff --git a/src/io/local_transport/local_system.hpp b/src/io/local_transport/local_system.hpp index 2e54f8d75..ec148b50b 100644 --- a/src/io/local_transport/local_system.hpp +++ b/src/io/local_transport/local_system.hpp @@ -29,6 +29,8 @@ class LocalSystem { return Io{local_transport, address}; } + std::shared_ptr &GetTransportHandle() { return local_transport_handle_; } + void ShutDown() { local_transport_handle_->ShutDown(); } }; diff --git a/src/io/simulator/simulator.hpp b/src/io/simulator/simulator.hpp index 622c264b4..5095ab58b 100644 --- a/src/io/simulator/simulator.hpp +++ b/src/io/simulator/simulator.hpp @@ -41,7 +41,7 @@ class Simulator { Io Register(Address address) { std::uniform_int_distribution seed_distrib; uint64_t seed = seed_distrib(rng_); - return Io{SimulatorTransport{simulator_handle_, address, seed}, address}; + return Io{SimulatorTransport(simulator_handle_, address, seed), address}; } void IncrementServerCountAndWaitForQuiescentState(Address address) { @@ -49,5 +49,7 @@ class Simulator { } SimulatorStats Stats() { return simulator_handle_->Stats(); } + + std::shared_ptr GetSimulatorHandle() { return simulator_handle_; } }; }; // namespace memgraph::io::simulator diff --git a/src/io/simulator/simulator_transport.hpp b/src/io/simulator/simulator_transport.hpp index 5e5a24aa9..492b59d3a 100644 --- a/src/io/simulator/simulator_transport.hpp +++ b/src/io/simulator/simulator_transport.hpp @@ -25,7 +25,7 @@ using memgraph::io::Time; class SimulatorTransport { std::shared_ptr simulator_handle_; - const Address address_; + Address address_; std::mt19937 rng_; public: diff --git a/src/memgraph.cpp b/src/memgraph.cpp index d825cc0e7..ca03024fe 100644 --- a/src/memgraph.cpp +++ b/src/memgraph.cpp @@ -607,15 +607,15 @@ int main(int argc, char **argv) { // to minimize the impact of their failure on the main storage. memgraph::io::local_transport::LocalSystem ls; - auto unique_local_addr_query = memgraph::coordinator::Address::UniqueLocalAddress(); - auto io = ls.Register(unique_local_addr_query); + auto unique_local_coord_addr_query = memgraph::coordinator::Address::UniqueLocalAddress(); + auto io = ls.Register(unique_local_coord_addr_query); memgraph::machine_manager::MachineConfig config{ - .coordinator_addresses = std::vector{unique_local_addr_query}, + .coordinator_addresses = std::vector{unique_local_coord_addr_query}, .is_storage = true, .is_coordinator = true, - .listen_ip = unique_local_addr_query.last_known_ip, - .listen_port = unique_local_addr_query.last_known_port, + .listen_ip = unique_local_coord_addr_query.last_known_ip, + .listen_port = unique_local_coord_addr_query.last_known_port, }; memgraph::coordinator::ShardMap sm; @@ -640,6 +640,8 @@ int main(int argc, char **argv) { memgraph::machine_manager::MachineManager mm{io, config, coordinator}; std::jthread mm_thread([&mm] { mm.Run(); }); + auto rr_factory = std::make_unique(ls.GetTransportHandle()); + memgraph::query::v2::InterpreterContext interpreter_context{ (memgraph::storage::v3::Shard *)(nullptr), {.query = {.allow_load_csv = FLAGS_allow_load_csv}, @@ -650,7 +652,7 @@ int main(int argc, char **argv) { .stream_transaction_conflict_retries = FLAGS_stream_transaction_conflict_retries, .stream_transaction_retry_interval = std::chrono::milliseconds(FLAGS_stream_transaction_retry_interval)}, FLAGS_data_directory, - std::move(io), + std::move(rr_factory), mm.CoordinatorAddress()}; SessionData session_data{&interpreter_context}; diff --git a/src/query/v2/interpreter.cpp b/src/query/v2/interpreter.cpp index 1c2d6dadf..c428d1a39 100644 --- a/src/query/v2/interpreter.cpp +++ b/src/query/v2/interpreter.cpp @@ -793,34 +793,24 @@ using RWType = plan::ReadWriteTypeChecker::RWType; InterpreterContext::InterpreterContext(storage::v3::Shard *db, const InterpreterConfig config, const std::filesystem::path & /*data_directory*/, - io::Io io, + std::unique_ptr &&request_router_factory, coordinator::Address coordinator_addr) - : db(db), config(config), io{std::move(io)}, coordinator_address{coordinator_addr} {} + : db(db), + config(config), + coordinator_address{coordinator_addr}, + request_router_factory_{std::move(request_router_factory)} {} Interpreter::Interpreter(InterpreterContext *interpreter_context) : interpreter_context_(interpreter_context) { MG_ASSERT(interpreter_context_, "Interpreter context must not be NULL"); - // TODO(tyler) make this deterministic so that it can be tested. - auto random_uuid = boost::uuids::uuid{boost::uuids::random_generator()()}; - auto query_io = interpreter_context_->io.ForkLocal(random_uuid); + request_router_ = + interpreter_context_->request_router_factory_->CreateRequestRouter(interpreter_context_->coordinator_address); - request_router_ = std::make_unique>( - coordinator::CoordinatorClient( - query_io, interpreter_context_->coordinator_address, std::vector{interpreter_context_->coordinator_address}), - std::move(query_io)); // Get edge ids - coordinator::CoordinatorWriteRequests requests{coordinator::AllocateEdgeIdBatchRequest{.batch_size = 1000000}}; - io::rsm::WriteRequest ww; - ww.operation = requests; - auto resp = interpreter_context_->io - .Request, - io::rsm::WriteResponse>( - interpreter_context_->coordinator_address, ww) - .Wait(); - if (resp.HasValue()) { - const auto alloc_edge_id_reps = - std::get(resp.GetValue().message.write_return); - interpreter_context_->edge_ids_alloc = {alloc_edge_id_reps.low, alloc_edge_id_reps.high}; + const auto edge_ids_alloc_min_max_pair = + request_router_->AllocateInitialEdgeIds(interpreter_context_->coordinator_address); + if (edge_ids_alloc_min_max_pair) { + interpreter_context_->edge_ids_alloc = {edge_ids_alloc_min_max_pair->first, edge_ids_alloc_min_max_pair->second}; } } diff --git a/src/query/v2/interpreter.hpp b/src/query/v2/interpreter.hpp index 985c9a90c..a83a26f11 100644 --- a/src/query/v2/interpreter.hpp +++ b/src/query/v2/interpreter.hpp @@ -17,6 +17,7 @@ #include "coordinator/coordinator.hpp" #include "coordinator/coordinator_client.hpp" #include "io/local_transport/local_transport.hpp" +#include "io/simulator/simulator_transport.hpp" #include "io/transport.hpp" #include "query/v2/auth_checker.hpp" #include "query/v2/bindings/cypher_main_visitor.hpp" @@ -172,7 +173,8 @@ struct PreparedQuery { struct InterpreterContext { explicit InterpreterContext(storage::v3::Shard *db, InterpreterConfig config, const std::filesystem::path &data_directory, - io::Io io, coordinator::Address coordinator_addr); + std::unique_ptr &&request_router_factory, + coordinator::Address coordinator_addr); storage::v3::Shard *db; @@ -188,26 +190,25 @@ struct InterpreterContext { const InterpreterConfig config; IdAllocator edge_ids_alloc; - // TODO (antaljanosbenjamin) Figure out an abstraction for io::Io to make it possible to construct an interpreter - // context with a simulator transport without templatizing it. - io::Io io; coordinator::Address coordinator_address; storage::v3::LabelId NameToLabelId(std::string_view label_name) { - return storage::v3::LabelId::FromUint(query_id_mapper.NameToId(label_name)); + return storage::v3::LabelId::FromUint(query_id_mapper_.NameToId(label_name)); } storage::v3::PropertyId NameToPropertyId(std::string_view property_name) { - return storage::v3::PropertyId::FromUint(query_id_mapper.NameToId(property_name)); + return storage::v3::PropertyId::FromUint(query_id_mapper_.NameToId(property_name)); } storage::v3::EdgeTypeId NameToEdgeTypeId(std::string_view edge_type_name) { - return storage::v3::EdgeTypeId::FromUint(query_id_mapper.NameToId(edge_type_name)); + return storage::v3::EdgeTypeId::FromUint(query_id_mapper_.NameToId(edge_type_name)); } + std::unique_ptr request_router_factory_; + private: // TODO Replace with local map of labels, properties and edge type ids - storage::v3::NameIdMapper query_id_mapper; + storage::v3::NameIdMapper query_id_mapper_; }; /// Function that is used to tell all active interpreters that they should stop @@ -296,7 +297,7 @@ class Interpreter final { */ void Abort(); - const RequestRouterInterface *GetRequestRouter() const { return request_router_.get(); } + RequestRouterInterface *GetRequestRouter() { return request_router_.get(); } private: struct QueryExecution { diff --git a/src/query/v2/request_router.hpp b/src/query/v2/request_router.hpp index 996272fdc..de63cd76e 100644 --- a/src/query/v2/request_router.hpp +++ b/src/query/v2/request_router.hpp @@ -11,6 +11,7 @@ #pragma once +#include #include #include #include @@ -23,6 +24,7 @@ #include #include #include +#include #include #include "coordinator/coordinator.hpp" @@ -31,6 +33,7 @@ #include "coordinator/shard_map.hpp" #include "io/address.hpp" #include "io/errors.hpp" +#include "io/local_transport/local_transport.hpp" #include "io/rsm/raft.hpp" #include "io/rsm/rsm_client.hpp" #include "io/rsm/shard_rsm.hpp" @@ -124,6 +127,10 @@ class RequestRouterInterface { virtual std::optional MaybeNameToLabel(const std::string &name) const = 0; virtual bool IsPrimaryLabel(storage::v3::LabelId label) const = 0; virtual bool IsPrimaryKey(storage::v3::LabelId primary_label, storage::v3::PropertyId property) const = 0; + + virtual std::optional> AllocateInitialEdgeIds(io::Address coordinator_address) { + return {}; + } }; // TODO(kostasrim)rename this class template @@ -595,6 +602,23 @@ class RequestRouter : public RequestRouterInterface { edge_types_.StoreMapping(std::move(id_to_name)); } + std::optional> AllocateInitialEdgeIds(io::Address coordinator_address) override { + coordinator::CoordinatorWriteRequests requests{coordinator::AllocateEdgeIdBatchRequest{.batch_size = 1000000}}; + + io::rsm::WriteRequest ww; + ww.operation = requests; + auto resp = + io_.template Request, + io::rsm::WriteResponse>(coordinator_address, ww) + .Wait(); + if (resp.HasValue()) { + const auto alloc_edge_id_reps = + std::get(resp.GetValue().message.write_return); + return std::make_pair(alloc_edge_id_reps.low, alloc_edge_id_reps.high); + } + return {}; + } + ShardMap shards_map_; storage::v3::NameIdMapper properties_; storage::v3::NameIdMapper edge_types_; @@ -605,4 +629,89 @@ class RequestRouter : public RequestRouterInterface { coordinator::Hlc transaction_id_; // TODO(kostasrim) Add batch prefetching }; + +class RequestRouterFactory { + protected: + using LocalTransport = io::Io; + using SimulatorTransport = io::Io; + + using LocalTransportHandlePtr = std::shared_ptr; + using SimulatorTransportHandlePtr = std::shared_ptr; + + using TransportHandleVariant = std::variant; + + TransportHandleVariant transport_handle_; + + public: + explicit RequestRouterFactory(const TransportHandleVariant &transport_handle) : transport_handle_(transport_handle) {} + + RequestRouterFactory(const RequestRouterFactory &) = delete; + RequestRouterFactory &operator=(const RequestRouterFactory &) = delete; + RequestRouterFactory(RequestRouterFactory &&) = delete; + RequestRouterFactory &operator=(RequestRouterFactory &&) = delete; + + virtual ~RequestRouterFactory() = default; + + virtual TransportHandleVariant GetTransportHandle() { return transport_handle_; } + + virtual std::unique_ptr CreateRequestRouter( + const coordinator::Address &coordinator_address) const noexcept = 0; +}; + +class LocalRequestRouterFactory : public RequestRouterFactory { + public: + explicit LocalRequestRouterFactory(const TransportHandleVariant &transport_handle) + : RequestRouterFactory(transport_handle) {} + + std::unique_ptr CreateRequestRouter( + const coordinator::Address &coordinator_address) const noexcept override { + using TransportType = io::local_transport::LocalTransport; + auto actual_transport_handle = std::get(transport_handle_); + + boost::uuids::uuid random_uuid; + io::Address unique_local_addr_query; + + random_uuid = boost::uuids::uuid{boost::uuids::random_generator()()}; + unique_local_addr_query = memgraph::coordinator::Address::UniqueLocalAddress(); + + TransportType local_transport(actual_transport_handle); + auto local_transport_io = io::Io(local_transport, unique_local_addr_query); + + auto query_io = local_transport_io.ForkLocal(random_uuid); + + return std::make_unique>( + coordinator::CoordinatorClient(query_io, coordinator_address, {coordinator_address}), + std::move(local_transport_io)); + } +}; + +class SimulatedRequestRouterFactory : public RequestRouterFactory { + mutable io::simulator::Simulator *simulator_; + coordinator::Address address_; + + public: + explicit SimulatedRequestRouterFactory(io::simulator::Simulator &simulator, coordinator::Address address) + : RequestRouterFactory(simulator.GetSimulatorHandle()), simulator_(&simulator), address_(address) {} + + std::unique_ptr CreateRequestRouter( + const coordinator::Address &coordinator_address) const noexcept override { + using TransportType = io::simulator::SimulatorTransport; + auto actual_transport_handle = std::get(transport_handle_); + + boost::uuids::uuid random_uuid; + io::Address unique_local_addr_query; + + // The simulated RR should not introduce stochastic behavior. + random_uuid = boost::uuids::uuid{3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; + unique_local_addr_query = {.unique_id = boost::uuids::uuid{4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}}; + + auto io = simulator_->Register(unique_local_addr_query); + auto query_io = io.ForkLocal(random_uuid); + + return std::make_unique>( + coordinator::CoordinatorClient(query_io, coordinator_address, {coordinator_address}), + std::move(io)); + } +}; + } // namespace memgraph::query::v2 diff --git a/tests/simulation/CMakeLists.txt b/tests/simulation/CMakeLists.txt index 9e1a4c71e..f81f89798 100644 --- a/tests/simulation/CMakeLists.txt +++ b/tests/simulation/CMakeLists.txt @@ -17,7 +17,7 @@ function(add_simulation_test test_cpp) # requires unique logical target names set_target_properties(${target_name} PROPERTIES OUTPUT_NAME ${exec_name}) - target_link_libraries(${target_name} mg-storage-v3 mg-communication mg-utils mg-io mg-io-simulator mg-coordinator mg-query-v2) + target_link_libraries(${target_name} mg-communication mg-utils mg-io mg-io-simulator mg-coordinator mg-query-v2 mg-storage-v3) target_link_libraries(${target_name} Boost::headers) target_link_libraries(${target_name} gtest gtest_main gmock rapidcheck rapidcheck_gtest) @@ -32,3 +32,4 @@ add_simulation_test(trial_query_storage/query_storage_test.cpp) add_simulation_test(sharded_map.cpp) add_simulation_test(shard_rsm.cpp) add_simulation_test(cluster_property_test.cpp) +add_simulation_test(cluster_property_test_v2.cpp) diff --git a/tests/simulation/cluster_property_test_v2.cpp b/tests/simulation/cluster_property_test_v2.cpp new file mode 100644 index 000000000..2f121647b --- /dev/null +++ b/tests/simulation/cluster_property_test_v2.cpp @@ -0,0 +1,65 @@ +// Copyright 2022 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +// This test serves as an example of a property-based model test. +// It generates a cluster configuration and a set of operations to +// apply against both the real system and a greatly simplified model. + +#include + +#include +#include +#include +#include + +#include "generated_operations.hpp" +#include "io/simulator/simulator_config.hpp" +#include "io/time.hpp" +#include "storage/v3/shard_manager.hpp" +#include "test_cluster.hpp" + +namespace memgraph::tests::simulation { + +using io::Duration; +using io::Time; +using io::simulator::SimulatorConfig; +using storage::v3::kMaximumCronInterval; + +RC_GTEST_PROP(RandomClusterConfig, HappyPath, (ClusterConfig cluster_config, NonEmptyOpVec ops, uint64_t rng_seed)) { + spdlog::cfg::load_env_levels(); + + SimulatorConfig sim_config{ + .drop_percent = 0, + .perform_timeouts = false, + .scramble_messages = true, + .rng_seed = rng_seed, + .start_time = Time::min(), + // TODO(tyler) set abort_time to something more restrictive than Time::max() + .abort_time = Time::max(), + }; + + std::vector queries = {"CREATE (n:test_label{property_1: 0, property_2: 0});", "MATCH (n) RETURN n;"}; + + auto [sim_stats_1, latency_stats_1] = RunClusterSimulationWithQueries(sim_config, cluster_config, queries); + auto [sim_stats_2, latency_stats_2] = RunClusterSimulationWithQueries(sim_config, cluster_config, queries); + + if (latency_stats_1 != latency_stats_2) { + spdlog::error("simulator stats diverged across runs"); + spdlog::error("run 1 simulator stats: {}", sim_stats_1); + spdlog::error("run 2 simulator stats: {}", sim_stats_2); + spdlog::error("run 1 latency:\n{}", latency_stats_1.SummaryTable()); + spdlog::error("run 2 latency:\n{}", latency_stats_2.SummaryTable()); + RC_ASSERT(latency_stats_1 == latency_stats_2); + RC_ASSERT(sim_stats_1 == sim_stats_2); + } +} + +} // namespace memgraph::tests::simulation diff --git a/tests/simulation/simulation_interpreter.hpp b/tests/simulation/simulation_interpreter.hpp new file mode 100644 index 000000000..b59f335be --- /dev/null +++ b/tests/simulation/simulation_interpreter.hpp @@ -0,0 +1,95 @@ +// Copyright 2022 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include +#include +#include "io/simulator/simulator_handle.hpp" +#include "query/v2/config.hpp" +#include "query/v2/discard_value_stream.hpp" +#include "query/v2/frontend/ast/ast.hpp" +#include "query/v2/interpreter.hpp" +#include "query/v2/request_router.hpp" + +#include + +// TODO(gvolfing) +// -How to set up the entire raft cluster with the QE. Also provide abrstraction for that. +// -Pass an argument to the setup to determine, how many times the retry of a query should happen. + +namespace memgraph::io::simulator { + +class SimulatedInterpreter { + using ResultStream = query::v2::DiscardValueResultStream; + + public: + explicit SimulatedInterpreter(std::unique_ptr &&interpreter_context) + : interpreter_context_(std::move(interpreter_context)) { + interpreter_ = std::make_unique(&(*interpreter_context_)); + } + + SimulatedInterpreter(const SimulatedInterpreter &) = delete; + SimulatedInterpreter &operator=(const SimulatedInterpreter &) = delete; + SimulatedInterpreter(SimulatedInterpreter &&) = delete; + SimulatedInterpreter &operator=(SimulatedInterpreter &&) = delete; + ~SimulatedInterpreter() = default; + + std::vector RunQueries(const std::vector &queries) { + std::vector results; + results.reserve(queries.size()); + + for (const auto &query : queries) { + results.emplace_back(RunQuery(query)); + } + return results; + } + + private: + ResultStream RunQuery(const std::string &query) { + ResultStream stream; + + std::map params; + const std::string *username = nullptr; + + interpreter_->BeginTransaction(); + + auto *rr = interpreter_->GetRequestRouter(); + rr->StartTransaction(); + + interpreter_->Prepare(query, params, username); + interpreter_->PullAll(&stream); + interpreter_->CommitTransaction(); + + return stream; + } + + std::unique_ptr interpreter_context_; + std::unique_ptr interpreter_; +}; + +SimulatedInterpreter SetUpInterpreter(Address coordinator_address, Simulator &simulator) { + auto rr_factory = + std::make_unique(simulator, coordinator_address); + + auto interpreter_context = std::make_unique( + (memgraph::storage::v3::Shard *)(nullptr), + memgraph::query::v2::InterpreterConfig{.query = {.allow_load_csv = true}, + .execution_timeout_sec = 600, + .replication_replica_check_frequency = std::chrono::seconds(1), + .default_kafka_bootstrap_servers = "", + .default_pulsar_service_url = "", + .stream_transaction_conflict_retries = 30, + .stream_transaction_retry_interval = std::chrono::milliseconds(500)}, + std::filesystem::path("mg_data"), std::move(rr_factory), coordinator_address); + + return SimulatedInterpreter(std::move(interpreter_context)); +} + +} // namespace memgraph::io::simulator diff --git a/tests/simulation/test_cluster.hpp b/tests/simulation/test_cluster.hpp index 1392a0632..5aa792c16 100644 --- a/tests/simulation/test_cluster.hpp +++ b/tests/simulation/test_cluster.hpp @@ -36,6 +36,8 @@ #include "utils/print_helpers.hpp" #include "utils/variant_helpers.hpp" +#include "simulation_interpreter.hpp" + namespace memgraph::tests::simulation { using coordinator::Coordinator; @@ -277,4 +279,64 @@ std::pair RunClusterSimulation(const return std::make_pair(stats, histo); } +std::pair RunClusterSimulationWithQueries( + const SimulatorConfig &sim_config, const ClusterConfig &cluster_config, const std::vector &queries) { + spdlog::info("========================== NEW SIMULATION =========================="); + + auto simulator = Simulator(sim_config); + + auto machine_1_addr = Address::TestAddress(1); + auto cli_addr = Address::TestAddress(2); + auto cli_addr_2 = Address::TestAddress(3); + + Io cli_io = simulator.Register(cli_addr); + Io cli_io_2 = simulator.Register(cli_addr_2); + + auto coordinator_addresses = std::vector{ + machine_1_addr, + }; + + ShardMap initialization_sm = TestShardMap(cluster_config.shards - 1, cluster_config.replication_factor); + + auto mm_1 = MkMm(simulator, coordinator_addresses, machine_1_addr, initialization_sm); + Address coordinator_address = mm_1.CoordinatorAddress(); + + auto mm_thread_1 = std::jthread(RunMachine, std::move(mm_1)); + simulator.IncrementServerCountAndWaitForQuiescentState(machine_1_addr); + + auto detach_on_error = DetachIfDropped{.handle = mm_thread_1}; + + // TODO(tyler) clarify addresses of coordinator etc... as it's a mess + + CoordinatorClient coordinator_client(cli_io, coordinator_address, {coordinator_address}); + WaitForShardsToInitialize(coordinator_client); + + auto simulated_interpreter = io::simulator::SetUpInterpreter(coordinator_address, simulator); + + auto query_results = simulated_interpreter.RunQueries(queries); + + // We have now completed our workload without failing any assertions, so we can + // disable detaching the worker thread, which will cause the mm_thread_1 jthread + // to be joined when this function returns. + detach_on_error.detach = false; + + simulator.ShutDown(); + + mm_thread_1.join(); + + SimulatorStats stats = simulator.Stats(); + + spdlog::info("total messages: {}", stats.total_messages); + spdlog::info("dropped messages: {}", stats.dropped_messages); + spdlog::info("timed out requests: {}", stats.timed_out_requests); + spdlog::info("total requests: {}", stats.total_requests); + spdlog::info("total responses: {}", stats.total_responses); + spdlog::info("simulator ticks: {}", stats.simulator_ticks); + + auto histo = cli_io_2.ResponseLatencies(); + + spdlog::info("========================== SUCCESS :) =========================="); + return std::make_pair(stats, histo); +} + } // namespace memgraph::tests::simulation