middleware prototype

This commit is contained in:
Kostas Kyrimis 2022-09-02 16:19:42 +03:00
parent 445cd2d206
commit 1a2f138e09
6 changed files with 697 additions and 4 deletions

View File

@ -48,7 +48,7 @@ using memgraph::io::simulator::SimulatorTransport;
using memgraph::storage::v3::LabelId;
using memgraph::storage::v3::PropertyValue;
using ShardRsmKey = std::vector<PropertyValue>;
using ShardRsmKey = std::vector<memgraph::storage::v3::PropertyValue>;
struct StorageWriteRequest {
LabelId label_id;

230
src/query/v2/middleware.hpp Normal file
View File

@ -0,0 +1,230 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <chrono>
#include <deque>
#include <iostream>
#include <map>
#include <optional>
#include <set>
#include <thread>
#include <unordered_map>
#include <vector>
#include "coordinator/coordinator_client.hpp"
#include "coordinator/coordinator_rsm.hpp"
#include "coordinator/shard_map.hpp"
#include "io/address.hpp"
#include "io/errors.hpp"
#include "io/rsm/raft.hpp"
#include "io/rsm/rsm_client.hpp"
#include "io/rsm/shard_rsm.hpp"
#include "io/simulator/simulator.hpp"
#include "io/simulator/simulator_transport.hpp"
#include "query/v2/requests.hpp"
#include "utils/result.hpp"
template <typename TStorageClient>
class RsmStorageClientManager {
public:
using CompoundKey = memgraph::io::rsm::ShardRsmKey;
RsmStorageClientManager() = default;
RsmStorageClientManager(const RsmStorageClientManager &) = delete;
RsmStorageClientManager(RsmStorageClientManager &&) = delete;
void AddClient(const std::string &label, CompoundKey cm_k, TStorageClient client) {
cli_cache_[label].insert({std::move(cm_k), std::move(client)});
}
bool Exists(const std::string &label, const CompoundKey &cm_k) { return cli_cache_[label].contains(cm_k); }
void PurgeCache() { cli_cache_.clear(); }
// void EvictFromCache(std::vector<TStorageClient>);
TStorageClient &GetClient(const std::string &label, CompoundKey key) { return cli_cache_[label].find(key)->second; }
private:
std::unordered_map<std::string, std::map<CompoundKey, TStorageClient>> cli_cache_;
};
// In execution context an object exists
struct ExecutionState {
using CompoundKey = memgraph::io::rsm::ShardRsmKey;
using Shard = memgraph::coordinator::Shard;
std::optional<std::vector<Shard>> state_;
std::string label;
// using CompoundKey = memgraph::coordinator::CompoundKey;
std::optional<CompoundKey> key;
};
namespace rsm = memgraph::io::rsm;
// TODO(kostasrim)rename this class template
template <typename TTransport>
class QueryEngineMiddleware {
public:
using StorageClient =
memgraph::coordinator::RsmClient<TTransport, rsm::StorageWriteRequest, rsm::StorageWriteResponse,
rsm::StorageReadRequest, rsm::StorageReadResponse>;
using CoordinatorClient = memgraph::coordinator::CoordinatorClient<TTransport>;
using Address = memgraph::io::Address;
using Shard = memgraph::coordinator::Shard;
using ShardMap = memgraph::coordinator::ShardMap;
using CompoundKey = memgraph::coordinator::CompoundKey;
using memgraph::io::Io;
QueryEngineMiddleware(CoordinatorClient coord, Io<TTransport> &&io)
: coord_cli_(std::move(coord)), io_(std::move(io)) {}
std::vector<ScanVerticesResponse> Request(ScanVerticesRequest rqst, ExecutionState &state) {
MaybeUpdateShardMap();
MaybeUpdateExecutionState(state);
std::vector<ScanVerticesResponse> responses;
for (const auto &shard : *state.state_) {
auto &storage_client = GetStorageClientForShard(state.label, *state.key);
auto read_response_result = storage_client.SendReadRequest(rqst);
// RETRY on timeouts?
// Sometimes this produces a timeout. Temporary solution is to use a while(true) as was done in shard_map test
if (read_response_result.HasError()) {
throw std::runtime_error("Handle gracefully!");
}
responses.push_back(read_response_result.Value());
}
// TODO(kostasrim) Update state accordingly
return responses;
// For a future based API. Also maybe introduce a `Retry` function that accepts a lambda which is the request
// and a number denoting the number of times the request is retried until an exception or an error is returned.
// std::vector<memgraph::io::future<ScanAllVerticesRequest>> requests;
// for (const auto &shard : state.state_) {
// auto &storage_client = GetStorageClientForShard(state.Label, rqst.label);
// requests.push_back(client->Request(rqst));
// }
//
// std::vector<ScanAllVerticesResponse> responses;
// for (auto &f : requests) {
// f.wait();
// if (f.HasError()) {
// // handle error
// }
// responses.push_back(std::move(f).Value());
// }
}
CreateVerticesResponse Request(CreateVerticesRequest rqst, ExecutionState &state) {
// MaybeUpdateShardMap();
// MaybeUpdateExecutionState();
}
size_t TestRequest(ExecutionState &state) {
MaybeUpdateShardMap();
MaybeUpdateExecutionState(state);
for (auto &st : *state.state_) {
auto &storage_client = GetStorageClientForShard(state.label, *state.key);
rsm::StorageWriteRequest storage_req;
storage_req.key = *state.key;
storage_req.value = 469;
auto write_response_result = storage_client.SendWriteRequest(storage_req);
if (write_response_result.HasError()) {
throw std::runtime_error("Handle gracefully!");
}
auto write_response = write_response_result.GetValue();
bool cas_succeeded = write_response.shard_rsm_success;
if (!cas_succeeded) {
throw std::runtime_error("Handler gracefully!");
}
rsm::StorageReadRequest storage_get_req;
storage_get_req.key = *state.key;
auto get_response_result = storage_client.SendReadRequest(storage_get_req);
if (get_response_result.HasError()) {
throw std::runtime_error("Handler gracefully!");
}
auto get_response = get_response_result.GetValue();
auto val = get_response.value.value();
return val;
}
return 0;
}
private:
void MaybeUpdateShardMap() {
memgraph::coordinator::HlcRequest req{.last_shard_map_version = shards_map_.GetHlc()};
auto read_res = coord_cli_.SendReadRequest(req);
if (read_res.HasError()) {
// handle error gracefully
// throw some error
}
auto coordinator_read_response = read_res.GetValue();
auto hlc_response = std::get<memgraph::coordinator::HlcResponse>(coordinator_read_response);
if (hlc_response.fresher_shard_map) {
// error here new shard map shouldn't exist
}
// Transaction ID to be used later...
auto transaction_id = hlc_response.new_hlc;
if (hlc_response.fresher_shard_map) {
shards_map_ = hlc_response.fresher_shard_map.value();
} else {
throw std::runtime_error("Should handle gracefully!");
}
}
void MaybeUpdateExecutionState(ExecutionState &state) {
if (state.state_) {
return;
}
state.state_ = std::make_optional<std::vector<Shard>>();
const auto &shards = shards_map_.shards[state.label];
if (state.key) {
if (auto it = shards.find(*state.key); it != shards.end()) {
state.state_->push_back(it->second);
return;
}
// throw here
}
for (const auto &[key, shard] : shards) {
state.state_->push_back(shard);
}
}
// std::vector<storageclient> GetStorageClientFromShardforRange(const std::string &label, const CompoundKey &start,
// const CompoundKey &end);
StorageClient &GetStorageClientForShard(const std::string &label, const CompoundKey &cm_k) {
if (storage_cli_manager_.Exists(label, cm_k)) {
return storage_cli_manager_.GetClient(label, cm_k);
}
auto target_shard = shards_map_.GetShardForKey(label, cm_k);
AddStorageClientToManager(std::move(target_shard), label, cm_k);
return storage_cli_manager_.GetClient(label, cm_k);
}
void AddStorageClientToManager(Shard target_shard, const std::string &label, const CompoundKey &cm_k) {
MG_ASSERT(!target_shard.empty());
auto leader_addr = target_shard.front();
std::vector<Address> addresses;
for (auto &address : target_shard) {
addresses.push_back(std::move(address.address));
}
auto cli = StorageClient(io_, std::move(leader_addr.address), std::move(addresses));
storage_cli_manager_.AddClient(label, cm_k, std::move(cli));
}
ShardMap shards_map_;
CoordinatorClient coord_cli_;
RsmStorageClientManager<StorageClient> storage_cli_manager_;
Io<TTransport> io_;
// TODO(kostasrim) Add batch prefetching
};

223
src/query/v2/requests.hpp Normal file
View File

@ -0,0 +1,223 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <chrono>
#include <iostream>
#include <map>
#include <optional>
#include <unordered_map>
#include <variant>
#include <vector>
#include "storage/v3/id_types.hpp"
#include "storage/v3/property_value.hpp"
/// Hybrid-logical clock
struct Hlc {
uint64_t logical_id;
using Duration = std::chrono::microseconds;
using Time = std::chrono::time_point<std::chrono::system_clock, Duration>;
Time coordinator_wall_clock;
bool operator==(const Hlc &other) const = default;
};
struct Label {
size_t id;
};
// TODO(kostasrim) update this with CompoundKey, same for the rest of the file.
using PrimaryKey = std::vector<memgraph::storage::v3::PropertyValue>;
using VertexId = std::pair<Label, PrimaryKey>;
using Gid = size_t;
using PropertyId = memgraph::storage::v3::PropertyId;
struct EdgeType {
std::string name;
};
struct EdgeId {
VertexId id;
Gid gid;
};
struct Vertex {
VertexId id;
std::vector<Label> labels;
};
struct Edge {
VertexId src;
VertexId dst;
EdgeType type;
};
struct PathPart {
Vertex dst;
Gid edge;
};
struct Path {
Vertex src;
std::vector<PathPart> parts;
};
struct Null {};
struct Value {
enum Type { NILL, BOOL, INT64, DOUBLE, STRING, LIST, MAP, VERTEX, EDGE, PATH };
union {
Null null_v;
bool bool_v;
uint64_t int_v;
double double_v;
std::string string_v;
std::vector<Value> list_v;
std::map<std::string, Value> map_v;
Vertex vertex_v;
Edge edge_v;
Path path_v;
};
Type type;
};
struct ValuesMap {
std::unordered_map<PropertyId, Value> values_map;
};
struct MappedValues {
std::vector<ValuesMap> values_map;
};
struct ListedValues {
std::vector<std::vector<Value>> properties;
};
using Values = std::variant<ListedValues, MappedValues>;
struct Expression {
std::string expression;
};
struct Filter {
std::string filter_expression;
};
enum class OrderingDirection { ASCENDING = 1, DESCENDING = 2 };
struct OrderBy {
Expression expression;
OrderingDirection direction;
};
enum class StorageView { OLD = 0, NEW = 1 };
struct ScanVerticesRequest {
Hlc transaction_id;
size_t start_id;
std::optional<std::vector<std::string>> props_to_return;
std::optional<std::vector<std::string>> filter_expressions;
std::optional<size_t> batch_limit;
StorageView storage_view;
};
struct ScanVerticesResponse {
bool success;
Values values;
std::optional<VertexId> next_start_id;
};
using VertexOrEdgeIds = std::variant<VertexId, EdgeId>;
struct GetPropertiesRequest {
Hlc transaction_id;
VertexOrEdgeIds vertex_or_edge_ids;
std::vector<PropertyId> property_ids;
std::vector<Expression> expressions;
bool only_unique = false;
std::optional<std::vector<OrderBy>> order_by;
std::optional<size_t> limit;
std::optional<Filter> filter;
};
struct GetPropertiesResponse {
bool success;
Values values;
};
enum class EdgeDirection : uint8_t { OUT = 1, IN = 2, BOTH = 3 };
struct ExpandOneRequest {
Hlc transaction_id;
std::vector<VertexId> src_vertices;
std::vector<EdgeType> edge_types;
EdgeDirection direction;
bool only_unique_neighbor_rows = false;
// The empty optional means return all of the properties, while an empty
// list means do not return any properties
// TODO(antaljanosbenjamin): All of the special values should be communicated through a single vertex object
// after schema is implemented
// Special values are accepted:
// * __mg__labels
std::optional<std::vector<PropertyId>> src_vertex_properties;
// TODO(antaljanosbenjamin): All of the special values should be communicated through a single vertex object
// after schema is implemented
// Special values are accepted:
// * __mg__dst_id (Vertex, but without labels)
// * __mg__type (binary)
std::optional<std::vector<PropertyId>> edge_properties;
// QUESTION(antaljanosbenjamin): Maybe also add possibility to expressions evaluated on the source vertex?
// List of expressions evaluated on edges
std::vector<Expression> expressions;
std::optional<std::vector<OrderBy>> order_by;
std::optional<size_t> limit;
std::optional<Filter> filter;
};
struct ExpandOneResultRow {
// NOTE: This struct could be a single Values with columns something like this:
// src_vertex(Vertex), vertex_prop1(Value), vertex_prop2(Value), edges(list<Value>)
// where edges might be a list of:
// 1. list<Value> if only a defined list of edge properties are returned
// 2. map<binary, Value> if all of the edge properties are returned
// The drawback of this is currently the key of the map is always interpreted as a string in Value, not as an
// integer, which should be in case of mapped properties.
Vertex src_vertex;
std::optional<Values> src_vertex_properties;
Values edges;
};
struct ExpandOneResponse {
std::vector<ExpandOneResultRow> result;
};
struct NewVertex {
std::vector<Label> label_ids;
std::map<PropertyId, Value> properties;
};
struct CreateVerticesRequest {
Hlc transaction_id;
std::vector<NewVertex> new_vertices;
};
struct CreateVerticesResponse {
bool success;
};
using ReadRequests = std::variant<ExpandOneRequest, GetPropertiesRequest, ScanVerticesRequest>;
using ReadResponses = std::variant<ExpandOneResponse, GetPropertiesResponse, ScanVerticesResponse>;
using WriteRequests = CreateVerticesRequest;
using WriteResponses = CreateVerticesResponse;

View File

@ -32,3 +32,5 @@ add_simulation_test(raft.cpp address)
add_simulation_test(trial_query_storage/query_storage_test.cpp address)
add_simulation_test(sharded_map.cpp address)
add_simulation_test(middleware.cpp address)

View File

@ -0,0 +1,238 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include <chrono>
#include <deque>
#include <iostream>
#include <map>
#include <optional>
#include <set>
#include <thread>
#include <vector>
#include "coordinator/coordinator_client.hpp"
#include "coordinator/coordinator_rsm.hpp"
#include "io/address.hpp"
#include "io/errors.hpp"
#include "io/rsm/raft.hpp"
#include "io/rsm/rsm_client.hpp"
#include "io/rsm/shard_rsm.hpp"
#include "io/simulator/simulator.hpp"
#include "io/simulator/simulator_transport.hpp"
#include "query/v2/middleware.hpp"
#include "storage/v2/property_value.hpp"
#include "storage/v3/property_value.hpp"
#include "utils/result.hpp"
using memgraph::coordinator::AddressAndStatus;
using memgraph::coordinator::CompoundKey;
using memgraph::coordinator::Coordinator;
using memgraph::coordinator::CoordinatorClient;
using memgraph::coordinator::CoordinatorRsm;
using memgraph::coordinator::HlcRequest;
using memgraph::coordinator::HlcResponse;
using memgraph::coordinator::Shard;
using memgraph::coordinator::ShardMap;
using memgraph::coordinator::Shards;
using memgraph::coordinator::Status;
using memgraph::io::Address;
using memgraph::io::Io;
using memgraph::io::ResponseEnvelope;
using memgraph::io::ResponseFuture;
using memgraph::io::Time;
using memgraph::io::TimedOut;
using memgraph::io::rsm::Raft;
using memgraph::io::rsm::ReadRequest;
using memgraph::io::rsm::ReadResponse;
using memgraph::io::rsm::RsmClient;
using memgraph::io::rsm::StorageReadRequest;
using memgraph::io::rsm::StorageReadResponse;
using memgraph::io::rsm::StorageRsm;
using memgraph::io::rsm::StorageWriteRequest;
using memgraph::io::rsm::StorageWriteResponse;
using memgraph::io::rsm::WriteRequest;
using memgraph::io::rsm::WriteResponse;
using memgraph::io::simulator::Simulator;
using memgraph::io::simulator::SimulatorConfig;
using memgraph::io::simulator::SimulatorStats;
using memgraph::io::simulator::SimulatorTransport;
using memgraph::utils::BasicResult;
using StorageClient =
RsmClient<SimulatorTransport, StorageWriteRequest, StorageWriteResponse, StorageReadRequest, StorageReadResponse>;
namespace {
ShardMap CreateDummyShardmap(memgraph::coordinator::Address a_io_1, memgraph::coordinator::Address a_io_2,
memgraph::coordinator::Address a_io_3, memgraph::coordinator::Address b_io_1,
memgraph::coordinator::Address b_io_2, memgraph::coordinator::Address b_io_3) {
ShardMap sm1;
auto &shards = sm1.GetShards();
// 1
std::string label1 = std::string("label1");
auto key1 = memgraph::storage::v3::PropertyValue(3);
auto key2 = memgraph::storage::v3::PropertyValue(4);
CompoundKey cm1 = {key1, key2};
AddressAndStatus aas1_1{.address = a_io_1, .status = Status::CONSENSUS_PARTICIPANT};
AddressAndStatus aas1_2{.address = a_io_2, .status = Status::CONSENSUS_PARTICIPANT};
AddressAndStatus aas1_3{.address = a_io_3, .status = Status::CONSENSUS_PARTICIPANT};
Shard shard1 = {aas1_1, aas1_2, aas1_3};
Shards shards1;
shards1[cm1] = shard1;
// 2
std::string label2 = std::string("label2");
auto key3 = memgraph::storage::v3::PropertyValue(12);
auto key4 = memgraph::storage::v3::PropertyValue(13);
CompoundKey cm2 = {key3, key4};
AddressAndStatus aas2_1{.address = b_io_1, .status = Status::CONSENSUS_PARTICIPANT};
AddressAndStatus aas2_2{.address = b_io_2, .status = Status::CONSENSUS_PARTICIPANT};
AddressAndStatus aas2_3{.address = b_io_3, .status = Status::CONSENSUS_PARTICIPANT};
Shard shard2 = {aas2_1, aas2_2, aas2_3};
Shards shards2;
shards2[cm2] = shard2;
shards[label1] = shards1;
shards[label2] = shards2;
return sm1;
}
} // namespace
using ConcreteCoordinatorRsm = CoordinatorRsm<SimulatorTransport>;
using ConcreteStorageRsm = Raft<SimulatorTransport, StorageRsm, StorageWriteRequest, StorageWriteResponse,
StorageReadRequest, StorageReadResponse>;
template <typename IoImpl>
void RunStorageRaft(
Raft<IoImpl, StorageRsm, StorageWriteRequest, StorageWriteResponse, StorageReadRequest, StorageReadResponse>
server) {
server.Run();
}
int main() {
SimulatorConfig config{
.drop_percent = 0,
.perform_timeouts = false,
.scramble_messages = false,
.rng_seed = 0,
.start_time = Time::min() + std::chrono::microseconds{256 * 1024},
.abort_time = Time::min() + std::chrono::microseconds{2 * 8 * 1024 * 1024},
};
auto simulator = Simulator(config);
Io<SimulatorTransport> cli_io = simulator.RegisterNew();
// Register
Io<SimulatorTransport> a_io_1 = simulator.RegisterNew();
Io<SimulatorTransport> a_io_2 = simulator.RegisterNew();
Io<SimulatorTransport> a_io_3 = simulator.RegisterNew();
Io<SimulatorTransport> b_io_1 = simulator.RegisterNew();
Io<SimulatorTransport> b_io_2 = simulator.RegisterNew();
Io<SimulatorTransport> b_io_3 = simulator.RegisterNew();
// Preconfigure coordinator with kv shard 'A' and 'B'
auto sm1 = CreateDummyShardmap(a_io_1.GetAddress(), a_io_2.GetAddress(), a_io_3.GetAddress(), b_io_1.GetAddress(),
b_io_2.GetAddress(), b_io_3.GetAddress());
auto sm2 = CreateDummyShardmap(a_io_1.GetAddress(), a_io_2.GetAddress(), a_io_3.GetAddress(), b_io_1.GetAddress(),
b_io_2.GetAddress(), b_io_3.GetAddress());
auto sm3 = CreateDummyShardmap(a_io_1.GetAddress(), a_io_2.GetAddress(), a_io_3.GetAddress(), b_io_1.GetAddress(),
b_io_2.GetAddress(), b_io_3.GetAddress());
// Spin up shard A
std::vector<Address> a_addrs = {a_io_1.GetAddress(), a_io_2.GetAddress(), a_io_3.GetAddress()};
std::vector<Address> a_1_peers = {a_addrs[1], a_addrs[2]};
std::vector<Address> a_2_peers = {a_addrs[0], a_addrs[2]};
std::vector<Address> a_3_peers = {a_addrs[0], a_addrs[1]};
ConcreteStorageRsm a_1{std::move(a_io_1), a_1_peers, StorageRsm{}};
ConcreteStorageRsm a_2{std::move(a_io_2), a_2_peers, StorageRsm{}};
ConcreteStorageRsm a_3{std::move(a_io_3), a_3_peers, StorageRsm{}};
auto a_thread_1 = std::jthread(RunStorageRaft<SimulatorTransport>, std::move(a_1));
simulator.IncrementServerCountAndWaitForQuiescentState(a_addrs[0]);
auto a_thread_2 = std::jthread(RunStorageRaft<SimulatorTransport>, std::move(a_2));
simulator.IncrementServerCountAndWaitForQuiescentState(a_addrs[1]);
auto a_thread_3 = std::jthread(RunStorageRaft<SimulatorTransport>, std::move(a_3));
simulator.IncrementServerCountAndWaitForQuiescentState(a_addrs[2]);
// Spin up shard B
std::vector<Address> b_addrs = {b_io_1.GetAddress(), b_io_2.GetAddress(), b_io_3.GetAddress()};
std::vector<Address> b_1_peers = {b_addrs[1], b_addrs[2]};
std::vector<Address> b_2_peers = {b_addrs[0], b_addrs[2]};
std::vector<Address> b_3_peers = {b_addrs[0], b_addrs[1]};
ConcreteStorageRsm b_1{std::move(b_io_1), b_1_peers, StorageRsm{}};
ConcreteStorageRsm b_2{std::move(b_io_2), b_2_peers, StorageRsm{}};
ConcreteStorageRsm b_3{std::move(b_io_3), b_3_peers, StorageRsm{}};
auto b_thread_1 = std::jthread(RunStorageRaft<SimulatorTransport>, std::move(b_1));
simulator.IncrementServerCountAndWaitForQuiescentState(b_addrs[0]);
auto b_thread_2 = std::jthread(RunStorageRaft<SimulatorTransport>, std::move(b_2));
simulator.IncrementServerCountAndWaitForQuiescentState(b_addrs[1]);
auto b_thread_3 = std::jthread(RunStorageRaft<SimulatorTransport>, std::move(b_3));
simulator.IncrementServerCountAndWaitForQuiescentState(b_addrs[2]);
// Spin up coordinators
Io<SimulatorTransport> c_io_1 = simulator.RegisterNew();
Io<SimulatorTransport> c_io_2 = simulator.RegisterNew();
Io<SimulatorTransport> c_io_3 = simulator.RegisterNew();
std::vector<Address> c_addrs = {c_io_1.GetAddress(), c_io_2.GetAddress(), c_io_3.GetAddress()};
std::vector<Address> c_1_peers = {c_addrs[1], c_addrs[2]};
std::vector<Address> c_2_peers = {c_addrs[0], c_addrs[2]};
std::vector<Address> c_3_peers = {c_addrs[0], c_addrs[1]};
ConcreteCoordinatorRsm c_1{std::move(c_io_1), c_1_peers, Coordinator{(sm1)}};
ConcreteCoordinatorRsm c_2{std::move(c_io_2), c_2_peers, Coordinator{(sm2)}};
ConcreteCoordinatorRsm c_3{std::move(c_io_3), c_3_peers, Coordinator{(sm3)}};
auto c_thread_1 = std::jthread([c_1]() mutable { c_1.Run(); });
simulator.IncrementServerCountAndWaitForQuiescentState(c_addrs[0]);
auto c_thread_2 = std::jthread([c_2]() mutable { c_2.Run(); });
simulator.IncrementServerCountAndWaitForQuiescentState(c_addrs[1]);
auto c_thread_3 = std::jthread([c_3]() mutable { c_3.Run(); });
simulator.IncrementServerCountAndWaitForQuiescentState(c_addrs[2]);
std::cout << "beginning test after servers have become quiescent" << std::endl;
// Have client contact coordinator RSM for a new transaction ID and
// also get the current shard map
CoordinatorClient<SimulatorTransport> coordinator_client(cli_io, c_addrs[0], c_addrs);
QueryEngineMiddleware<SimulatorTransport> io(std::move(cli_io), std::move(coordinator_client));
ExecutionState state;
state.key = std::make_optional<CompoundKey>(
std::vector{memgraph::storage::v3::PropertyValue(3), memgraph::storage::v3::PropertyValue(4)});
state.label = "label1";
auto result = io.Request(state);
simulator.ShutDown();
std::cout << "Result is: " << result;
return 0;
}

View File

@ -138,9 +138,9 @@ void RunStorageRaft(
int main() {
SimulatorConfig config{
.drop_percent = 5,
.perform_timeouts = true,
.scramble_messages = true,
.drop_percent = 0,
.perform_timeouts = false,
.scramble_messages = false,
.rng_seed = 0,
.start_time = Time::min() + std::chrono::microseconds{256 * 1024},
.abort_time = Time::min() + std::chrono::microseconds{2 * 8 * 1024 * 1024},