Implement cypher query based simulation testing

Make the Interpreter be able to handle SimulatorTransport as well. This
includes introducing changes that make it possible to use the different
transport types in a semi-polymorphic way with the introduction of
factory methods in the RequestRouter. The reason for this solution is
that the classes that represent the different transport types have
member function templates, that we can not make virtual. This solution
seemed to be the least convoluted. In the testing itself now it is
possible to pass a set of cypher queried to the interpreter which would
run these queries against the interpreter and the individual shards that
are managed and started up by the MachineManager with the different
entities communicating over the simulated network.
This commit is contained in:
gvolfing 2022-12-12 10:53:07 +01:00
parent 53040c6758
commit 3604046f68
11 changed files with 368 additions and 39 deletions

View File

@ -29,6 +29,8 @@ class LocalSystem {
return Io{local_transport, address}; return Io{local_transport, address};
} }
std::shared_ptr<LocalTransportHandle> &GetTransportHandle() { return local_transport_handle_; }
void ShutDown() { local_transport_handle_->ShutDown(); } void ShutDown() { local_transport_handle_->ShutDown(); }
}; };

View File

@ -41,7 +41,7 @@ class Simulator {
Io<SimulatorTransport> Register(Address address) { Io<SimulatorTransport> Register(Address address) {
std::uniform_int_distribution<uint64_t> seed_distrib; std::uniform_int_distribution<uint64_t> seed_distrib;
uint64_t seed = seed_distrib(rng_); uint64_t seed = seed_distrib(rng_);
return Io{SimulatorTransport{simulator_handle_, address, seed}, address}; return Io{SimulatorTransport(simulator_handle_, address, seed), address};
} }
void IncrementServerCountAndWaitForQuiescentState(Address address) { void IncrementServerCountAndWaitForQuiescentState(Address address) {
@ -49,5 +49,7 @@ class Simulator {
} }
SimulatorStats Stats() { return simulator_handle_->Stats(); } SimulatorStats Stats() { return simulator_handle_->Stats(); }
std::shared_ptr<SimulatorHandle> GetSimulatorHandle() { return simulator_handle_; }
}; };
}; // namespace memgraph::io::simulator }; // namespace memgraph::io::simulator

View File

@ -25,7 +25,7 @@ using memgraph::io::Time;
class SimulatorTransport { class SimulatorTransport {
std::shared_ptr<SimulatorHandle> simulator_handle_; std::shared_ptr<SimulatorHandle> simulator_handle_;
const Address address_; Address address_;
std::mt19937 rng_; std::mt19937 rng_;
public: public:

View File

@ -607,15 +607,15 @@ int main(int argc, char **argv) {
// to minimize the impact of their failure on the main storage. // to minimize the impact of their failure on the main storage.
memgraph::io::local_transport::LocalSystem ls; memgraph::io::local_transport::LocalSystem ls;
auto unique_local_addr_query = memgraph::coordinator::Address::UniqueLocalAddress(); auto unique_local_coord_addr_query = memgraph::coordinator::Address::UniqueLocalAddress();
auto io = ls.Register(unique_local_addr_query); auto io = ls.Register(unique_local_coord_addr_query);
memgraph::machine_manager::MachineConfig config{ memgraph::machine_manager::MachineConfig config{
.coordinator_addresses = std::vector<memgraph::io::Address>{unique_local_addr_query}, .coordinator_addresses = std::vector<memgraph::io::Address>{unique_local_coord_addr_query},
.is_storage = true, .is_storage = true,
.is_coordinator = true, .is_coordinator = true,
.listen_ip = unique_local_addr_query.last_known_ip, .listen_ip = unique_local_coord_addr_query.last_known_ip,
.listen_port = unique_local_addr_query.last_known_port, .listen_port = unique_local_coord_addr_query.last_known_port,
}; };
memgraph::coordinator::ShardMap sm; memgraph::coordinator::ShardMap sm;
@ -640,6 +640,8 @@ int main(int argc, char **argv) {
memgraph::machine_manager::MachineManager<memgraph::io::local_transport::LocalTransport> mm{io, config, coordinator}; memgraph::machine_manager::MachineManager<memgraph::io::local_transport::LocalTransport> mm{io, config, coordinator};
std::jthread mm_thread([&mm] { mm.Run(); }); std::jthread mm_thread([&mm] { mm.Run(); });
auto rr_factory = std::make_unique<memgraph::query::v2::LocalRequestRouterFactory>(ls.GetTransportHandle());
memgraph::query::v2::InterpreterContext interpreter_context{ memgraph::query::v2::InterpreterContext interpreter_context{
(memgraph::storage::v3::Shard *)(nullptr), (memgraph::storage::v3::Shard *)(nullptr),
{.query = {.allow_load_csv = FLAGS_allow_load_csv}, {.query = {.allow_load_csv = FLAGS_allow_load_csv},
@ -650,7 +652,7 @@ int main(int argc, char **argv) {
.stream_transaction_conflict_retries = FLAGS_stream_transaction_conflict_retries, .stream_transaction_conflict_retries = FLAGS_stream_transaction_conflict_retries,
.stream_transaction_retry_interval = std::chrono::milliseconds(FLAGS_stream_transaction_retry_interval)}, .stream_transaction_retry_interval = std::chrono::milliseconds(FLAGS_stream_transaction_retry_interval)},
FLAGS_data_directory, FLAGS_data_directory,
std::move(io), std::move(rr_factory),
mm.CoordinatorAddress()}; mm.CoordinatorAddress()};
SessionData session_data{&interpreter_context}; SessionData session_data{&interpreter_context};

View File

@ -793,34 +793,24 @@ using RWType = plan::ReadWriteTypeChecker::RWType;
InterpreterContext::InterpreterContext(storage::v3::Shard *db, const InterpreterConfig config, InterpreterContext::InterpreterContext(storage::v3::Shard *db, const InterpreterConfig config,
const std::filesystem::path & /*data_directory*/, const std::filesystem::path & /*data_directory*/,
io::Io<io::local_transport::LocalTransport> io, std::unique_ptr<RequestRouterFactory> &&request_router_factory,
coordinator::Address coordinator_addr) coordinator::Address coordinator_addr)
: db(db), config(config), io{std::move(io)}, coordinator_address{coordinator_addr} {} : db(db),
config(config),
coordinator_address{coordinator_addr},
request_router_factory_{std::move(request_router_factory)} {}
Interpreter::Interpreter(InterpreterContext *interpreter_context) : interpreter_context_(interpreter_context) { Interpreter::Interpreter(InterpreterContext *interpreter_context) : interpreter_context_(interpreter_context) {
MG_ASSERT(interpreter_context_, "Interpreter context must not be NULL"); MG_ASSERT(interpreter_context_, "Interpreter context must not be NULL");
// TODO(tyler) make this deterministic so that it can be tested. request_router_ =
auto random_uuid = boost::uuids::uuid{boost::uuids::random_generator()()}; interpreter_context_->request_router_factory_->CreateRequestRouter(interpreter_context_->coordinator_address);
auto query_io = interpreter_context_->io.ForkLocal(random_uuid);
request_router_ = std::make_unique<RequestRouter<io::local_transport::LocalTransport>>(
coordinator::CoordinatorClient<io::local_transport::LocalTransport>(
query_io, interpreter_context_->coordinator_address, std::vector{interpreter_context_->coordinator_address}),
std::move(query_io));
// Get edge ids // Get edge ids
coordinator::CoordinatorWriteRequests requests{coordinator::AllocateEdgeIdBatchRequest{.batch_size = 1000000}}; const auto edge_ids_alloc_min_max_pair =
io::rsm::WriteRequest<coordinator::CoordinatorWriteRequests> ww; request_router_->AllocateInitialEdgeIds(interpreter_context_->coordinator_address);
ww.operation = requests; if (edge_ids_alloc_min_max_pair) {
auto resp = interpreter_context_->io interpreter_context_->edge_ids_alloc = {edge_ids_alloc_min_max_pair->first, edge_ids_alloc_min_max_pair->second};
.Request<io::rsm::WriteRequest<coordinator::CoordinatorWriteRequests>,
io::rsm::WriteResponse<coordinator::CoordinatorWriteResponses>>(
interpreter_context_->coordinator_address, ww)
.Wait();
if (resp.HasValue()) {
const auto alloc_edge_id_reps =
std::get<coordinator::AllocateEdgeIdBatchResponse>(resp.GetValue().message.write_return);
interpreter_context_->edge_ids_alloc = {alloc_edge_id_reps.low, alloc_edge_id_reps.high};
} }
} }

View File

@ -17,6 +17,7 @@
#include "coordinator/coordinator.hpp" #include "coordinator/coordinator.hpp"
#include "coordinator/coordinator_client.hpp" #include "coordinator/coordinator_client.hpp"
#include "io/local_transport/local_transport.hpp" #include "io/local_transport/local_transport.hpp"
#include "io/simulator/simulator_transport.hpp"
#include "io/transport.hpp" #include "io/transport.hpp"
#include "query/v2/auth_checker.hpp" #include "query/v2/auth_checker.hpp"
#include "query/v2/bindings/cypher_main_visitor.hpp" #include "query/v2/bindings/cypher_main_visitor.hpp"
@ -172,7 +173,8 @@ struct PreparedQuery {
struct InterpreterContext { struct InterpreterContext {
explicit InterpreterContext(storage::v3::Shard *db, InterpreterConfig config, explicit InterpreterContext(storage::v3::Shard *db, InterpreterConfig config,
const std::filesystem::path &data_directory, const std::filesystem::path &data_directory,
io::Io<io::local_transport::LocalTransport> io, coordinator::Address coordinator_addr); std::unique_ptr<RequestRouterFactory> &&request_router_factory,
coordinator::Address coordinator_addr);
storage::v3::Shard *db; storage::v3::Shard *db;
@ -188,26 +190,25 @@ struct InterpreterContext {
const InterpreterConfig config; const InterpreterConfig config;
IdAllocator edge_ids_alloc; IdAllocator edge_ids_alloc;
// TODO (antaljanosbenjamin) Figure out an abstraction for io::Io to make it possible to construct an interpreter
// context with a simulator transport without templatizing it.
io::Io<io::local_transport::LocalTransport> io;
coordinator::Address coordinator_address; coordinator::Address coordinator_address;
storage::v3::LabelId NameToLabelId(std::string_view label_name) { storage::v3::LabelId NameToLabelId(std::string_view label_name) {
return storage::v3::LabelId::FromUint(query_id_mapper.NameToId(label_name)); return storage::v3::LabelId::FromUint(query_id_mapper_.NameToId(label_name));
} }
storage::v3::PropertyId NameToPropertyId(std::string_view property_name) { storage::v3::PropertyId NameToPropertyId(std::string_view property_name) {
return storage::v3::PropertyId::FromUint(query_id_mapper.NameToId(property_name)); return storage::v3::PropertyId::FromUint(query_id_mapper_.NameToId(property_name));
} }
storage::v3::EdgeTypeId NameToEdgeTypeId(std::string_view edge_type_name) { storage::v3::EdgeTypeId NameToEdgeTypeId(std::string_view edge_type_name) {
return storage::v3::EdgeTypeId::FromUint(query_id_mapper.NameToId(edge_type_name)); return storage::v3::EdgeTypeId::FromUint(query_id_mapper_.NameToId(edge_type_name));
} }
std::unique_ptr<RequestRouterFactory> request_router_factory_;
private: private:
// TODO Replace with local map of labels, properties and edge type ids // TODO Replace with local map of labels, properties and edge type ids
storage::v3::NameIdMapper query_id_mapper; storage::v3::NameIdMapper query_id_mapper_;
}; };
/// Function that is used to tell all active interpreters that they should stop /// Function that is used to tell all active interpreters that they should stop
@ -296,7 +297,7 @@ class Interpreter final {
*/ */
void Abort(); void Abort();
const RequestRouterInterface *GetRequestRouter() const { return request_router_.get(); } RequestRouterInterface *GetRequestRouter() { return request_router_.get(); }
private: private:
struct QueryExecution { struct QueryExecution {

View File

@ -11,6 +11,7 @@
#pragma once #pragma once
#include <boost/uuid/uuid.hpp>
#include <chrono> #include <chrono>
#include <deque> #include <deque>
#include <iostream> #include <iostream>
@ -23,6 +24,7 @@
#include <stdexcept> #include <stdexcept>
#include <thread> #include <thread>
#include <unordered_map> #include <unordered_map>
#include <variant>
#include <vector> #include <vector>
#include "coordinator/coordinator.hpp" #include "coordinator/coordinator.hpp"
@ -31,6 +33,7 @@
#include "coordinator/shard_map.hpp" #include "coordinator/shard_map.hpp"
#include "io/address.hpp" #include "io/address.hpp"
#include "io/errors.hpp" #include "io/errors.hpp"
#include "io/local_transport/local_transport.hpp"
#include "io/rsm/raft.hpp" #include "io/rsm/raft.hpp"
#include "io/rsm/rsm_client.hpp" #include "io/rsm/rsm_client.hpp"
#include "io/rsm/shard_rsm.hpp" #include "io/rsm/shard_rsm.hpp"
@ -124,6 +127,10 @@ class RequestRouterInterface {
virtual std::optional<storage::v3::LabelId> MaybeNameToLabel(const std::string &name) const = 0; virtual std::optional<storage::v3::LabelId> MaybeNameToLabel(const std::string &name) const = 0;
virtual bool IsPrimaryLabel(storage::v3::LabelId label) const = 0; virtual bool IsPrimaryLabel(storage::v3::LabelId label) const = 0;
virtual bool IsPrimaryKey(storage::v3::LabelId primary_label, storage::v3::PropertyId property) const = 0; virtual bool IsPrimaryKey(storage::v3::LabelId primary_label, storage::v3::PropertyId property) const = 0;
virtual std::optional<std::pair<uint64_t, uint64_t>> AllocateInitialEdgeIds(io::Address coordinator_address) {
return {};
}
}; };
// TODO(kostasrim)rename this class template // TODO(kostasrim)rename this class template
@ -595,6 +602,23 @@ class RequestRouter : public RequestRouterInterface {
edge_types_.StoreMapping(std::move(id_to_name)); edge_types_.StoreMapping(std::move(id_to_name));
} }
std::optional<std::pair<uint64_t, uint64_t>> AllocateInitialEdgeIds(io::Address coordinator_address) override {
coordinator::CoordinatorWriteRequests requests{coordinator::AllocateEdgeIdBatchRequest{.batch_size = 1000000}};
io::rsm::WriteRequest<coordinator::CoordinatorWriteRequests> ww;
ww.operation = requests;
auto resp =
io_.template Request<io::rsm::WriteRequest<coordinator::CoordinatorWriteRequests>,
io::rsm::WriteResponse<coordinator::CoordinatorWriteResponses>>(coordinator_address, ww)
.Wait();
if (resp.HasValue()) {
const auto alloc_edge_id_reps =
std::get<coordinator::AllocateEdgeIdBatchResponse>(resp.GetValue().message.write_return);
return std::make_pair(alloc_edge_id_reps.low, alloc_edge_id_reps.high);
}
return {};
}
ShardMap shards_map_; ShardMap shards_map_;
storage::v3::NameIdMapper properties_; storage::v3::NameIdMapper properties_;
storage::v3::NameIdMapper edge_types_; storage::v3::NameIdMapper edge_types_;
@ -605,4 +629,89 @@ class RequestRouter : public RequestRouterInterface {
coordinator::Hlc transaction_id_; coordinator::Hlc transaction_id_;
// TODO(kostasrim) Add batch prefetching // TODO(kostasrim) Add batch prefetching
}; };
class RequestRouterFactory {
protected:
using LocalTransport = io::Io<io::local_transport::LocalTransport>;
using SimulatorTransport = io::Io<io::simulator::SimulatorTransport>;
using LocalTransportHandlePtr = std::shared_ptr<io::local_transport::LocalTransportHandle>;
using SimulatorTransportHandlePtr = std::shared_ptr<io::simulator::SimulatorHandle>;
using TransportHandleVariant = std::variant<LocalTransportHandlePtr, SimulatorTransportHandlePtr>;
TransportHandleVariant transport_handle_;
public:
explicit RequestRouterFactory(const TransportHandleVariant &transport_handle) : transport_handle_(transport_handle) {}
RequestRouterFactory(const RequestRouterFactory &) = delete;
RequestRouterFactory &operator=(const RequestRouterFactory &) = delete;
RequestRouterFactory(RequestRouterFactory &&) = delete;
RequestRouterFactory &operator=(RequestRouterFactory &&) = delete;
virtual ~RequestRouterFactory() = default;
virtual TransportHandleVariant GetTransportHandle() { return transport_handle_; }
virtual std::unique_ptr<RequestRouterInterface> CreateRequestRouter(
const coordinator::Address &coordinator_address) const noexcept = 0;
};
class LocalRequestRouterFactory : public RequestRouterFactory {
public:
explicit LocalRequestRouterFactory(const TransportHandleVariant &transport_handle)
: RequestRouterFactory(transport_handle) {}
std::unique_ptr<RequestRouterInterface> CreateRequestRouter(
const coordinator::Address &coordinator_address) const noexcept override {
using TransportType = io::local_transport::LocalTransport;
auto actual_transport_handle = std::get<LocalTransportHandlePtr>(transport_handle_);
boost::uuids::uuid random_uuid;
io::Address unique_local_addr_query;
random_uuid = boost::uuids::uuid{boost::uuids::random_generator()()};
unique_local_addr_query = memgraph::coordinator::Address::UniqueLocalAddress();
TransportType local_transport(actual_transport_handle);
auto local_transport_io = io::Io<TransportType>(local_transport, unique_local_addr_query);
auto query_io = local_transport_io.ForkLocal(random_uuid);
return std::make_unique<RequestRouter<TransportType>>(
coordinator::CoordinatorClient<TransportType>(query_io, coordinator_address, {coordinator_address}),
std::move(local_transport_io));
}
};
class SimulatedRequestRouterFactory : public RequestRouterFactory {
mutable io::simulator::Simulator *simulator_;
coordinator::Address address_;
public:
explicit SimulatedRequestRouterFactory(io::simulator::Simulator &simulator, coordinator::Address address)
: RequestRouterFactory(simulator.GetSimulatorHandle()), simulator_(&simulator), address_(address) {}
std::unique_ptr<RequestRouterInterface> CreateRequestRouter(
const coordinator::Address &coordinator_address) const noexcept override {
using TransportType = io::simulator::SimulatorTransport;
auto actual_transport_handle = std::get<SimulatorTransportHandlePtr>(transport_handle_);
boost::uuids::uuid random_uuid;
io::Address unique_local_addr_query;
// The simulated RR should not introduce stochastic behavior.
random_uuid = boost::uuids::uuid{3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
unique_local_addr_query = {.unique_id = boost::uuids::uuid{4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}};
auto io = simulator_->Register(unique_local_addr_query);
auto query_io = io.ForkLocal(random_uuid);
return std::make_unique<RequestRouter<TransportType>>(
coordinator::CoordinatorClient<TransportType>(query_io, coordinator_address, {coordinator_address}),
std::move(io));
}
};
} // namespace memgraph::query::v2 } // namespace memgraph::query::v2

View File

@ -17,7 +17,7 @@ function(add_simulation_test test_cpp)
# requires unique logical target names # requires unique logical target names
set_target_properties(${target_name} PROPERTIES OUTPUT_NAME ${exec_name}) set_target_properties(${target_name} PROPERTIES OUTPUT_NAME ${exec_name})
target_link_libraries(${target_name} mg-storage-v3 mg-communication mg-utils mg-io mg-io-simulator mg-coordinator mg-query-v2) target_link_libraries(${target_name} mg-communication mg-utils mg-io mg-io-simulator mg-coordinator mg-query-v2 mg-storage-v3)
target_link_libraries(${target_name} Boost::headers) target_link_libraries(${target_name} Boost::headers)
target_link_libraries(${target_name} gtest gtest_main gmock rapidcheck rapidcheck_gtest) target_link_libraries(${target_name} gtest gtest_main gmock rapidcheck rapidcheck_gtest)
@ -32,3 +32,4 @@ add_simulation_test(trial_query_storage/query_storage_test.cpp)
add_simulation_test(sharded_map.cpp) add_simulation_test(sharded_map.cpp)
add_simulation_test(shard_rsm.cpp) add_simulation_test(shard_rsm.cpp)
add_simulation_test(cluster_property_test.cpp) add_simulation_test(cluster_property_test.cpp)
add_simulation_test(cluster_property_test_v2.cpp)

View File

@ -0,0 +1,65 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
// This test serves as an example of a property-based model test.
// It generates a cluster configuration and a set of operations to
// apply against both the real system and a greatly simplified model.
#include <chrono>
#include <gtest/gtest.h>
#include <rapidcheck.h>
#include <rapidcheck/gtest.h>
#include <spdlog/cfg/env.h>
#include "generated_operations.hpp"
#include "io/simulator/simulator_config.hpp"
#include "io/time.hpp"
#include "storage/v3/shard_manager.hpp"
#include "test_cluster.hpp"
namespace memgraph::tests::simulation {
using io::Duration;
using io::Time;
using io::simulator::SimulatorConfig;
using storage::v3::kMaximumCronInterval;
RC_GTEST_PROP(RandomClusterConfig, HappyPath, (ClusterConfig cluster_config, NonEmptyOpVec ops, uint64_t rng_seed)) {
spdlog::cfg::load_env_levels();
SimulatorConfig sim_config{
.drop_percent = 0,
.perform_timeouts = false,
.scramble_messages = true,
.rng_seed = rng_seed,
.start_time = Time::min(),
// TODO(tyler) set abort_time to something more restrictive than Time::max()
.abort_time = Time::max(),
};
std::vector<std::string> queries = {"CREATE (n:test_label{property_1: 0, property_2: 0});", "MATCH (n) RETURN n;"};
auto [sim_stats_1, latency_stats_1] = RunClusterSimulationWithQueries(sim_config, cluster_config, queries);
auto [sim_stats_2, latency_stats_2] = RunClusterSimulationWithQueries(sim_config, cluster_config, queries);
if (latency_stats_1 != latency_stats_2) {
spdlog::error("simulator stats diverged across runs");
spdlog::error("run 1 simulator stats: {}", sim_stats_1);
spdlog::error("run 2 simulator stats: {}", sim_stats_2);
spdlog::error("run 1 latency:\n{}", latency_stats_1.SummaryTable());
spdlog::error("run 2 latency:\n{}", latency_stats_2.SummaryTable());
RC_ASSERT(latency_stats_1 == latency_stats_2);
RC_ASSERT(sim_stats_1 == sim_stats_2);
}
}
} // namespace memgraph::tests::simulation

View File

@ -0,0 +1,95 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include <machine_manager/machine_config.hpp>
#include <machine_manager/machine_manager.hpp>
#include "io/simulator/simulator_handle.hpp"
#include "query/v2/config.hpp"
#include "query/v2/discard_value_stream.hpp"
#include "query/v2/frontend/ast/ast.hpp"
#include "query/v2/interpreter.hpp"
#include "query/v2/request_router.hpp"
#include <memory>
// TODO(gvolfing)
// -How to set up the entire raft cluster with the QE. Also provide abrstraction for that.
// -Pass an argument to the setup to determine, how many times the retry of a query should happen.
namespace memgraph::io::simulator {
class SimulatedInterpreter {
using ResultStream = query::v2::DiscardValueResultStream;
public:
explicit SimulatedInterpreter(std::unique_ptr<query::v2::InterpreterContext> &&interpreter_context)
: interpreter_context_(std::move(interpreter_context)) {
interpreter_ = std::make_unique<memgraph::query::v2::Interpreter>(&(*interpreter_context_));
}
SimulatedInterpreter(const SimulatedInterpreter &) = delete;
SimulatedInterpreter &operator=(const SimulatedInterpreter &) = delete;
SimulatedInterpreter(SimulatedInterpreter &&) = delete;
SimulatedInterpreter &operator=(SimulatedInterpreter &&) = delete;
~SimulatedInterpreter() = default;
std::vector<ResultStream> RunQueries(const std::vector<std::string> &queries) {
std::vector<ResultStream> results;
results.reserve(queries.size());
for (const auto &query : queries) {
results.emplace_back(RunQuery(query));
}
return results;
}
private:
ResultStream RunQuery(const std::string &query) {
ResultStream stream;
std::map<std::string, memgraph::storage::v3::PropertyValue> params;
const std::string *username = nullptr;
interpreter_->BeginTransaction();
auto *rr = interpreter_->GetRequestRouter();
rr->StartTransaction();
interpreter_->Prepare(query, params, username);
interpreter_->PullAll(&stream);
interpreter_->CommitTransaction();
return stream;
}
std::unique_ptr<query::v2::InterpreterContext> interpreter_context_;
std::unique_ptr<query::v2::Interpreter> interpreter_;
};
SimulatedInterpreter SetUpInterpreter(Address coordinator_address, Simulator &simulator) {
auto rr_factory =
std::make_unique<memgraph::query::v2::SimulatedRequestRouterFactory>(simulator, coordinator_address);
auto interpreter_context = std::make_unique<memgraph::query::v2::InterpreterContext>(
(memgraph::storage::v3::Shard *)(nullptr),
memgraph::query::v2::InterpreterConfig{.query = {.allow_load_csv = true},
.execution_timeout_sec = 600,
.replication_replica_check_frequency = std::chrono::seconds(1),
.default_kafka_bootstrap_servers = "",
.default_pulsar_service_url = "",
.stream_transaction_conflict_retries = 30,
.stream_transaction_retry_interval = std::chrono::milliseconds(500)},
std::filesystem::path("mg_data"), std::move(rr_factory), coordinator_address);
return SimulatedInterpreter(std::move(interpreter_context));
}
} // namespace memgraph::io::simulator

View File

@ -36,6 +36,8 @@
#include "utils/print_helpers.hpp" #include "utils/print_helpers.hpp"
#include "utils/variant_helpers.hpp" #include "utils/variant_helpers.hpp"
#include "simulation_interpreter.hpp"
namespace memgraph::tests::simulation { namespace memgraph::tests::simulation {
using coordinator::Coordinator; using coordinator::Coordinator;
@ -277,4 +279,64 @@ std::pair<SimulatorStats, LatencyHistogramSummaries> RunClusterSimulation(const
return std::make_pair(stats, histo); return std::make_pair(stats, histo);
} }
std::pair<SimulatorStats, LatencyHistogramSummaries> RunClusterSimulationWithQueries(
const SimulatorConfig &sim_config, const ClusterConfig &cluster_config, const std::vector<std::string> &queries) {
spdlog::info("========================== NEW SIMULATION ==========================");
auto simulator = Simulator(sim_config);
auto machine_1_addr = Address::TestAddress(1);
auto cli_addr = Address::TestAddress(2);
auto cli_addr_2 = Address::TestAddress(3);
Io<SimulatorTransport> cli_io = simulator.Register(cli_addr);
Io<SimulatorTransport> cli_io_2 = simulator.Register(cli_addr_2);
auto coordinator_addresses = std::vector{
machine_1_addr,
};
ShardMap initialization_sm = TestShardMap(cluster_config.shards - 1, cluster_config.replication_factor);
auto mm_1 = MkMm(simulator, coordinator_addresses, machine_1_addr, initialization_sm);
Address coordinator_address = mm_1.CoordinatorAddress();
auto mm_thread_1 = std::jthread(RunMachine, std::move(mm_1));
simulator.IncrementServerCountAndWaitForQuiescentState(machine_1_addr);
auto detach_on_error = DetachIfDropped{.handle = mm_thread_1};
// TODO(tyler) clarify addresses of coordinator etc... as it's a mess
CoordinatorClient<SimulatorTransport> coordinator_client(cli_io, coordinator_address, {coordinator_address});
WaitForShardsToInitialize(coordinator_client);
auto simulated_interpreter = io::simulator::SetUpInterpreter(coordinator_address, simulator);
auto query_results = simulated_interpreter.RunQueries(queries);
// We have now completed our workload without failing any assertions, so we can
// disable detaching the worker thread, which will cause the mm_thread_1 jthread
// to be joined when this function returns.
detach_on_error.detach = false;
simulator.ShutDown();
mm_thread_1.join();
SimulatorStats stats = simulator.Stats();
spdlog::info("total messages: {}", stats.total_messages);
spdlog::info("dropped messages: {}", stats.dropped_messages);
spdlog::info("timed out requests: {}", stats.timed_out_requests);
spdlog::info("total requests: {}", stats.total_requests);
spdlog::info("total responses: {}", stats.total_responses);
spdlog::info("simulator ticks: {}", stats.simulator_ticks);
auto histo = cli_io_2.ResponseLatencies();
spdlog::info("========================== SUCCESS :) ==========================");
return std::make_pair(stats, histo);
}
} // namespace memgraph::tests::simulation } // namespace memgraph::tests::simulation