Merge branch 'project-pineapples' into T1214-MG-implement-expand-with-multiframe

This commit is contained in:
János Benjamin Antal 2023-01-25 16:19:56 +01:00 committed by GitHub
commit 7d3d52c067
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 356 additions and 43 deletions

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source

View File

@ -41,7 +41,7 @@ class Simulator {
Io<SimulatorTransport> Register(Address address) {
std::uniform_int_distribution<uint64_t> seed_distrib;
uint64_t seed = seed_distrib(rng_);
return Io{SimulatorTransport{simulator_handle_, address, seed}, address};
return Io{SimulatorTransport(simulator_handle_, address, seed), address};
}
void IncrementServerCountAndWaitForQuiescentState(Address address) {
@ -50,8 +50,12 @@ class Simulator {
SimulatorStats Stats() { return simulator_handle_->Stats(); }
std::shared_ptr<SimulatorHandle> GetSimulatorHandle() const { return simulator_handle_; }
std::function<bool()> GetSimulatorTickClosure() {
std::function<bool()> tick_closure = [handle_copy = simulator_handle_] { return handle_copy->MaybeTickSimulator(); };
std::function<bool()> tick_closure = [handle_copy = simulator_handle_] {
return handle_copy->MaybeTickSimulator();
};
return tick_closure;
}
};

View File

@ -26,7 +26,7 @@ using memgraph::io::Time;
class SimulatorTransport {
std::shared_ptr<SimulatorHandle> simulator_handle_;
const Address address_;
Address address_;
std::mt19937 rng_;
public:
@ -36,7 +36,9 @@ class SimulatorTransport {
template <Message RequestT, Message ResponseT>
ResponseFuture<ResponseT> Request(Address to_address, Address from_address, RequestT request,
std::function<void()> notification, Duration timeout) {
std::function<bool()> tick_simulator = [handle_copy = simulator_handle_] { return handle_copy->MaybeTickSimulator(); };
std::function<bool()> tick_simulator = [handle_copy = simulator_handle_] {
return handle_copy->MaybeTickSimulator();
};
return simulator_handle_->template SubmitRequest<RequestT, ResponseT>(
to_address, from_address, std::move(request), timeout, std::move(tick_simulator), std::move(notification));

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -640,6 +640,8 @@ int main(int argc, char **argv) {
memgraph::machine_manager::MachineManager<memgraph::io::local_transport::LocalTransport> mm{io, config, coordinator};
std::jthread mm_thread([&mm] { mm.Run(); });
auto rr_factory = std::make_unique<memgraph::query::v2::LocalRequestRouterFactory>(io);
memgraph::query::v2::InterpreterContext interpreter_context{
(memgraph::storage::v3::Shard *)(nullptr),
{.query = {.allow_load_csv = FLAGS_allow_load_csv},
@ -650,7 +652,7 @@ int main(int argc, char **argv) {
.stream_transaction_conflict_retries = FLAGS_stream_transaction_conflict_retries,
.stream_transaction_retry_interval = std::chrono::milliseconds(FLAGS_stream_transaction_retry_interval)},
FLAGS_data_directory,
std::move(io),
std::move(rr_factory),
mm.CoordinatorAddress()};
SessionData session_data{&interpreter_context};

View File

@ -910,34 +910,24 @@ using RWType = plan::ReadWriteTypeChecker::RWType;
InterpreterContext::InterpreterContext(storage::v3::Shard *db, const InterpreterConfig config,
const std::filesystem::path & /*data_directory*/,
io::Io<io::local_transport::LocalTransport> io,
std::unique_ptr<RequestRouterFactory> request_router_factory,
coordinator::Address coordinator_addr)
: db(db), config(config), io{std::move(io)}, coordinator_address{coordinator_addr} {}
: db(db),
config(config),
coordinator_address{coordinator_addr},
request_router_factory_{std::move(request_router_factory)} {}
Interpreter::Interpreter(InterpreterContext *interpreter_context) : interpreter_context_(interpreter_context) {
MG_ASSERT(interpreter_context_, "Interpreter context must not be NULL");
// TODO(tyler) make this deterministic so that it can be tested.
auto random_uuid = boost::uuids::uuid{boost::uuids::random_generator()()};
auto query_io = interpreter_context_->io.ForkLocal(random_uuid);
request_router_ =
interpreter_context_->request_router_factory_->CreateRequestRouter(interpreter_context_->coordinator_address);
request_router_ = std::make_unique<RequestRouter<io::local_transport::LocalTransport>>(
coordinator::CoordinatorClient<io::local_transport::LocalTransport>(
query_io, interpreter_context_->coordinator_address, std::vector{interpreter_context_->coordinator_address}),
std::move(query_io));
// Get edge ids
coordinator::CoordinatorWriteRequests requests{coordinator::AllocateEdgeIdBatchRequest{.batch_size = 1000000}};
io::rsm::WriteRequest<coordinator::CoordinatorWriteRequests> ww;
ww.operation = requests;
auto resp = interpreter_context_->io
.Request<io::rsm::WriteRequest<coordinator::CoordinatorWriteRequests>,
io::rsm::WriteResponse<coordinator::CoordinatorWriteResponses>>(
interpreter_context_->coordinator_address, ww)
.Wait();
if (resp.HasValue()) {
const auto alloc_edge_id_reps =
std::get<coordinator::AllocateEdgeIdBatchResponse>(resp.GetValue().message.write_return);
interpreter_context_->edge_ids_alloc = {alloc_edge_id_reps.low, alloc_edge_id_reps.high};
const auto edge_ids_alloc_min_max_pair =
request_router_->AllocateInitialEdgeIds(interpreter_context_->coordinator_address);
if (edge_ids_alloc_min_max_pair) {
interpreter_context_->edge_ids_alloc = {edge_ids_alloc_min_max_pair->first, edge_ids_alloc_min_max_pair->second};
}
}

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -16,7 +16,6 @@
#include "coordinator/coordinator.hpp"
#include "coordinator/coordinator_client.hpp"
#include "io/local_transport/local_transport.hpp"
#include "io/transport.hpp"
#include "query/v2/auth_checker.hpp"
#include "query/v2/bindings/cypher_main_visitor.hpp"
@ -172,7 +171,8 @@ struct PreparedQuery {
struct InterpreterContext {
explicit InterpreterContext(storage::v3::Shard *db, InterpreterConfig config,
const std::filesystem::path &data_directory,
io::Io<io::local_transport::LocalTransport> io, coordinator::Address coordinator_addr);
std::unique_ptr<RequestRouterFactory> request_router_factory,
coordinator::Address coordinator_addr);
storage::v3::Shard *db;
@ -188,26 +188,24 @@ struct InterpreterContext {
const InterpreterConfig config;
IdAllocator edge_ids_alloc;
// TODO (antaljanosbenjamin) Figure out an abstraction for io::Io to make it possible to construct an interpreter
// context with a simulator transport without templatizing it.
io::Io<io::local_transport::LocalTransport> io;
coordinator::Address coordinator_address;
std::unique_ptr<RequestRouterFactory> request_router_factory_;
storage::v3::LabelId NameToLabelId(std::string_view label_name) {
return storage::v3::LabelId::FromUint(query_id_mapper.NameToId(label_name));
return storage::v3::LabelId::FromUint(query_id_mapper_.NameToId(label_name));
}
storage::v3::PropertyId NameToPropertyId(std::string_view property_name) {
return storage::v3::PropertyId::FromUint(query_id_mapper.NameToId(property_name));
return storage::v3::PropertyId::FromUint(query_id_mapper_.NameToId(property_name));
}
storage::v3::EdgeTypeId NameToEdgeTypeId(std::string_view edge_type_name) {
return storage::v3::EdgeTypeId::FromUint(query_id_mapper.NameToId(edge_type_name));
return storage::v3::EdgeTypeId::FromUint(query_id_mapper_.NameToId(edge_type_name));
}
private:
// TODO Replace with local map of labels, properties and edge type ids
storage::v3::NameIdMapper query_id_mapper;
storage::v3::NameIdMapper query_id_mapper_;
};
/// Function that is used to tell all active interpreters that they should stop
@ -297,12 +295,15 @@ class Interpreter final {
void Abort();
const RequestRouterInterface *GetRequestRouter() const { return request_router_.get(); }
void InstallSimulatorTicker(std::function<bool()> &&tick_simulator) {
request_router_->InstallSimulatorTicker(tick_simulator);
}
private:
struct QueryExecution {
std::optional<PreparedQuery> prepared_query;
utils::MonotonicBufferResource execution_memory{kExecutionMemoryBlockSize};
utils::ResourceWithOutOfMemoryException execution_memory_with_exception{&execution_memory};
std::optional<PreparedQuery> prepared_query;
std::map<std::string, TypedValue> summary;
std::vector<Notification> notifications;

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source

View File

@ -24,14 +24,18 @@
#include <stdexcept>
#include <thread>
#include <unordered_map>
#include <variant>
#include <vector>
#include <boost/uuid/uuid.hpp>
#include "coordinator/coordinator.hpp"
#include "coordinator/coordinator_client.hpp"
#include "coordinator/coordinator_rsm.hpp"
#include "coordinator/shard_map.hpp"
#include "io/address.hpp"
#include "io/errors.hpp"
#include "io/local_transport/local_transport.hpp"
#include "io/notifier.hpp"
#include "io/rsm/raft.hpp"
#include "io/rsm/rsm_client.hpp"
@ -114,6 +118,9 @@ class RequestRouterInterface {
virtual std::optional<storage::v3::LabelId> MaybeNameToLabel(const std::string &name) const = 0;
virtual bool IsPrimaryLabel(storage::v3::LabelId label) const = 0;
virtual bool IsPrimaryKey(storage::v3::LabelId primary_label, storage::v3::PropertyId property) const = 0;
virtual std::optional<std::pair<uint64_t, uint64_t>> AllocateInitialEdgeIds(io::Address coordinator_address) = 0;
virtual void InstallSimulatorTicker(std::function<bool()> tick_simulator) = 0;
virtual const std::vector<coordinator::SchemaProperty> &GetSchemaForLabel(storage::v3::LabelId label) const = 0;
};
@ -139,7 +146,7 @@ class RequestRouter : public RequestRouterInterface {
~RequestRouter() override {}
void InstallSimulatorTicker(std::function<bool()> tick_simulator) {
void InstallSimulatorTicker(std::function<bool()> tick_simulator) override {
notifier_.InstallSimulatorTicker(tick_simulator);
}
@ -715,6 +722,23 @@ class RequestRouter : public RequestRouterInterface {
edge_types_.StoreMapping(std::move(id_to_name));
}
std::optional<std::pair<uint64_t, uint64_t>> AllocateInitialEdgeIds(io::Address coordinator_address) override {
coordinator::CoordinatorWriteRequests requests{coordinator::AllocateEdgeIdBatchRequest{.batch_size = 1000000}};
io::rsm::WriteRequest<coordinator::CoordinatorWriteRequests> ww;
ww.operation = requests;
auto resp =
io_.template Request<io::rsm::WriteRequest<coordinator::CoordinatorWriteRequests>,
io::rsm::WriteResponse<coordinator::CoordinatorWriteResponses>>(coordinator_address, ww)
.Wait();
if (resp.HasValue()) {
const auto alloc_edge_id_reps =
std::get<coordinator::AllocateEdgeIdBatchResponse>(resp.GetValue().message.write_return);
return std::make_pair(alloc_edge_id_reps.low, alloc_edge_id_reps.high);
}
return {};
}
ShardMap shards_map_;
storage::v3::NameIdMapper properties_;
storage::v3::NameIdMapper edge_types_;
@ -726,4 +750,66 @@ class RequestRouter : public RequestRouterInterface {
io::Notifier notifier_ = {};
// TODO(kostasrim) Add batch prefetching
};
class RequestRouterFactory {
public:
RequestRouterFactory() = default;
RequestRouterFactory(const RequestRouterFactory &) = delete;
RequestRouterFactory &operator=(const RequestRouterFactory &) = delete;
RequestRouterFactory(RequestRouterFactory &&) = delete;
RequestRouterFactory &operator=(RequestRouterFactory &&) = delete;
virtual ~RequestRouterFactory() = default;
virtual std::unique_ptr<RequestRouterInterface> CreateRequestRouter(
const coordinator::Address &coordinator_address) const = 0;
};
class LocalRequestRouterFactory : public RequestRouterFactory {
using LocalTransportIo = io::Io<io::local_transport::LocalTransport>;
LocalTransportIo &io_;
public:
explicit LocalRequestRouterFactory(LocalTransportIo &io) : io_(io) {}
std::unique_ptr<RequestRouterInterface> CreateRequestRouter(
const coordinator::Address &coordinator_address) const override {
using TransportType = io::local_transport::LocalTransport;
auto query_io = io_.ForkLocal(boost::uuids::uuid{boost::uuids::random_generator()()});
auto local_transport_io = io_.ForkLocal(boost::uuids::uuid{boost::uuids::random_generator()()});
return std::make_unique<RequestRouter<TransportType>>(
coordinator::CoordinatorClient<TransportType>(query_io, coordinator_address, {coordinator_address}),
std::move(local_transport_io));
}
};
class SimulatedRequestRouterFactory : public RequestRouterFactory {
io::simulator::Simulator *simulator_;
public:
explicit SimulatedRequestRouterFactory(io::simulator::Simulator &simulator) : simulator_(&simulator) {}
std::unique_ptr<RequestRouterInterface> CreateRequestRouter(
const coordinator::Address &coordinator_address) const override {
using TransportType = io::simulator::SimulatorTransport;
auto actual_transport_handle = simulator_->GetSimulatorHandle();
boost::uuids::uuid random_uuid;
io::Address unique_local_addr_query;
// The simulated RR should not introduce stochastic behavior.
random_uuid = boost::uuids::uuid{3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
unique_local_addr_query = {.unique_id = boost::uuids::uuid{4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}};
auto io = simulator_->Register(unique_local_addr_query);
auto query_io = io.ForkLocal(random_uuid);
return std::make_unique<RequestRouter<TransportType>>(
coordinator::CoordinatorClient<TransportType>(query_io, coordinator_address, {coordinator_address}),
std::move(io));
}
};
} // namespace memgraph::query::v2

View File

@ -17,7 +17,7 @@ function(add_simulation_test test_cpp)
# requires unique logical target names
set_target_properties(${target_name} PROPERTIES OUTPUT_NAME ${exec_name})
target_link_libraries(${target_name} mg-storage-v3 mg-communication mg-utils mg-io mg-io-simulator mg-coordinator mg-query-v2)
target_link_libraries(${target_name} mg-communication mg-utils mg-io mg-io-simulator mg-coordinator mg-query-v2 mg-storage-v3)
target_link_libraries(${target_name} Boost::headers)
target_link_libraries(${target_name} gtest gtest_main gmock rapidcheck rapidcheck_gtest)
@ -32,4 +32,5 @@ add_simulation_test(trial_query_storage/query_storage_test.cpp)
add_simulation_test(sharded_map.cpp)
add_simulation_test(shard_rsm.cpp)
add_simulation_test(cluster_property_test.cpp)
add_simulation_test(cluster_property_test_cypher_queries.cpp)
add_simulation_test(request_router.cpp)

View File

@ -0,0 +1,64 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
// This test serves as an example of a property-based model test.
// It generates a cluster configuration and a set of operations to
// apply against both the real system and a greatly simplified model.
#include <chrono>
#include <gtest/gtest.h>
#include <rapidcheck.h>
#include <rapidcheck/gtest.h>
#include <spdlog/cfg/env.h>
#include "generated_operations.hpp"
#include "io/simulator/simulator_config.hpp"
#include "io/time.hpp"
#include "storage/v3/shard_manager.hpp"
#include "test_cluster.hpp"
namespace memgraph::tests::simulation {
using io::Duration;
using io::Time;
using io::simulator::SimulatorConfig;
using storage::v3::kMaximumCronInterval;
RC_GTEST_PROP(RandomClusterConfig, HappyPath, (ClusterConfig cluster_config, NonEmptyOpVec ops, uint64_t rng_seed)) {
spdlog::cfg::load_env_levels();
SimulatorConfig sim_config{
.drop_percent = 0,
.perform_timeouts = false,
.scramble_messages = true,
.rng_seed = rng_seed,
.start_time = Time::min(),
.abort_time = Time::max(),
};
std::vector<std::string> queries = {"CREATE (n:test_label{property_1: 0, property_2: 0});", "MATCH (n) RETURN n;"};
auto [sim_stats_1, latency_stats_1] = RunClusterSimulationWithQueries(sim_config, cluster_config, queries);
auto [sim_stats_2, latency_stats_2] = RunClusterSimulationWithQueries(sim_config, cluster_config, queries);
if (latency_stats_1 != latency_stats_2) {
spdlog::error("simulator stats diverged across runs");
spdlog::error("run 1 simulator stats: {}", sim_stats_1);
spdlog::error("run 2 simulator stats: {}", sim_stats_2);
spdlog::error("run 1 latency:\n{}", latency_stats_1.SummaryTable());
spdlog::error("run 2 latency:\n{}", latency_stats_2.SummaryTable());
RC_ASSERT(latency_stats_1 == latency_stats_2);
RC_ASSERT(sim_stats_1 == sim_stats_2);
}
}
} // namespace memgraph::tests::simulation

View File

@ -0,0 +1,93 @@
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include "io/simulator/simulator_handle.hpp"
#include "machine_manager/machine_config.hpp"
#include "machine_manager/machine_manager.hpp"
#include "query/v2/config.hpp"
#include "query/v2/discard_value_stream.hpp"
#include "query/v2/frontend/ast/ast.hpp"
#include "query/v2/interpreter.hpp"
#include "query/v2/request_router.hpp"
#include <string>
#include <vector>
// TODO(gvolfing)
// -How to set up the entire raft cluster with the QE. Also provide abrstraction for that.
// -Pass an argument to the setup to determine, how many times the retry of a query should happen.
namespace memgraph::io::simulator {
class SimulatedInterpreter {
using ResultStream = query::v2::DiscardValueResultStream;
public:
explicit SimulatedInterpreter(std::unique_ptr<query::v2::InterpreterContext> interpreter_context)
: interpreter_context_(std::move(interpreter_context)) {
interpreter_ = std::make_unique<memgraph::query::v2::Interpreter>(interpreter_context_.get());
}
SimulatedInterpreter(const SimulatedInterpreter &) = delete;
SimulatedInterpreter &operator=(const SimulatedInterpreter &) = delete;
SimulatedInterpreter(SimulatedInterpreter &&) = delete;
SimulatedInterpreter &operator=(SimulatedInterpreter &&) = delete;
~SimulatedInterpreter() = default;
void InstallSimulatorTicker(Simulator &simulator) {
interpreter_->InstallSimulatorTicker(simulator.GetSimulatorTickClosure());
}
std::vector<ResultStream> RunQueries(const std::vector<std::string> &queries) {
std::vector<ResultStream> results;
results.reserve(queries.size());
for (const auto &query : queries) {
results.emplace_back(RunQuery(query));
}
return results;
}
private:
ResultStream RunQuery(const std::string &query) {
ResultStream stream;
std::map<std::string, memgraph::storage::v3::PropertyValue> params;
const std::string *username = nullptr;
interpreter_->Prepare(query, params, username);
interpreter_->PullAll(&stream);
return stream;
}
std::unique_ptr<query::v2::InterpreterContext> interpreter_context_;
std::unique_ptr<query::v2::Interpreter> interpreter_;
};
SimulatedInterpreter SetUpInterpreter(Address coordinator_address, Simulator &simulator) {
auto rr_factory = std::make_unique<memgraph::query::v2::SimulatedRequestRouterFactory>(simulator);
auto interpreter_context = std::make_unique<memgraph::query::v2::InterpreterContext>(
nullptr,
memgraph::query::v2::InterpreterConfig{.query = {.allow_load_csv = true},
.execution_timeout_sec = 600,
.replication_replica_check_frequency = std::chrono::seconds(1),
.default_kafka_bootstrap_servers = "",
.default_pulsar_service_url = "",
.stream_transaction_conflict_retries = 30,
.stream_transaction_retry_interval = std::chrono::milliseconds(500)},
std::filesystem::path("mg_data"), std::move(rr_factory), coordinator_address);
return SimulatedInterpreter(std::move(interpreter_context));
}
} // namespace memgraph::io::simulator

View File

@ -36,6 +36,8 @@
#include "utils/print_helpers.hpp"
#include "utils/variant_helpers.hpp"
#include "simulation_interpreter.hpp"
namespace memgraph::tests::simulation {
using coordinator::Coordinator;
@ -279,4 +281,65 @@ std::pair<SimulatorStats, LatencyHistogramSummaries> RunClusterSimulation(const
return std::make_pair(stats, histo);
}
std::pair<SimulatorStats, LatencyHistogramSummaries> RunClusterSimulationWithQueries(
const SimulatorConfig &sim_config, const ClusterConfig &cluster_config, const std::vector<std::string> &queries) {
spdlog::info("========================== NEW SIMULATION ==========================");
auto simulator = Simulator(sim_config);
auto machine_1_addr = Address::TestAddress(1);
auto cli_addr = Address::TestAddress(2);
auto cli_addr_2 = Address::TestAddress(3);
Io<SimulatorTransport> cli_io = simulator.Register(cli_addr);
Io<SimulatorTransport> cli_io_2 = simulator.Register(cli_addr_2);
auto coordinator_addresses = std::vector{
machine_1_addr,
};
ShardMap initialization_sm = TestShardMap(cluster_config.shards - 1, cluster_config.replication_factor);
auto mm_1 = MkMm(simulator, coordinator_addresses, machine_1_addr, initialization_sm);
Address coordinator_address = mm_1.CoordinatorAddress();
auto mm_thread_1 = std::jthread(RunMachine, std::move(mm_1));
simulator.IncrementServerCountAndWaitForQuiescentState(machine_1_addr);
auto detach_on_error = DetachIfDropped{.handle = mm_thread_1};
// TODO(tyler) clarify addresses of coordinator etc... as it's a mess
CoordinatorClient<SimulatorTransport> coordinator_client(cli_io, coordinator_address, {coordinator_address});
WaitForShardsToInitialize(coordinator_client);
auto simulated_interpreter = io::simulator::SetUpInterpreter(coordinator_address, simulator);
simulated_interpreter.InstallSimulatorTicker(simulator);
auto query_results = simulated_interpreter.RunQueries(queries);
// We have now completed our workload without failing any assertions, so we can
// disable detaching the worker thread, which will cause the mm_thread_1 jthread
// to be joined when this function returns.
detach_on_error.detach = false;
simulator.ShutDown();
mm_thread_1.join();
SimulatorStats stats = simulator.Stats();
spdlog::info("total messages: {}", stats.total_messages);
spdlog::info("dropped messages: {}", stats.dropped_messages);
spdlog::info("timed out requests: {}", stats.timed_out_requests);
spdlog::info("total requests: {}", stats.total_requests);
spdlog::info("total responses: {}", stats.total_responses);
spdlog::info("simulator ticks: {}", stats.simulator_ticks);
auto histo = cli_io_2.ResponseLatencies();
spdlog::info("========================== SUCCESS :) ==========================");
return std::make_pair(stats, histo);
}
} // namespace memgraph::tests::simulation

View File

@ -42,6 +42,8 @@ class MockedRequestRouter : public RequestRouterInterface {
MOCK_METHOD(std::optional<storage::v3::LabelId>, MaybeNameToLabel, (const std::string &), (const));
MOCK_METHOD(bool, IsPrimaryLabel, (storage::v3::LabelId), (const));
MOCK_METHOD(bool, IsPrimaryKey, (storage::v3::LabelId, storage::v3::PropertyId), (const));
MOCK_METHOD((std::optional<std::pair<uint64_t, uint64_t>>), AllocateInitialEdgeIds, (io::Address));
MOCK_METHOD(void, InstallSimulatorTicker, (std::function<bool()>));
MOCK_METHOD(const std::vector<coordinator::SchemaProperty> &, GetSchemaForLabel, (storage::v3::LabelId), (const));
};

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source

View File

@ -125,6 +125,11 @@ class MockedRequestRouter : public RequestRouterInterface {
bool IsPrimaryKey(LabelId primary_label, PropertyId property) const override { return true; }
std::optional<std::pair<uint64_t, uint64_t>> AllocateInitialEdgeIds(io::Address coordinator_address) override {
return {};
}
void InstallSimulatorTicker(std::function<bool()> tick_simulator) override {}
const std::vector<coordinator::SchemaProperty> &GetSchemaForLabel(storage::v3::LabelId /*label*/) const override {
static std::vector<coordinator::SchemaProperty> schema;
return schema;