Added ScanAll prototype

This commit is contained in:
Kostas Kyrimis 2022-09-05 10:02:21 +03:00
parent 1a2f138e09
commit ba06d29a35
5 changed files with 354 additions and 113 deletions

View File

@ -16,7 +16,9 @@
#include <iostream>
#include <map>
#include <optional>
#include <random>
#include <set>
#include <stdexcept>
#include <thread>
#include <unordered_map>
#include <vector>
@ -32,6 +34,7 @@
#include "io/simulator/simulator.hpp"
#include "io/simulator/simulator_transport.hpp"
#include "query/v2/requests.hpp"
#include "storage/v3/id_types.hpp"
#include "utils/result.hpp"
template <typename TStorageClient>
@ -56,7 +59,7 @@ class RsmStorageClientManager {
std::unordered_map<std::string, std::map<CompoundKey, TStorageClient>> cli_cache_;
};
// In execution context an object exists
template <typename TRequest>
struct ExecutionState {
using CompoundKey = memgraph::io::rsm::ShardRsmKey;
using Shard = memgraph::coordinator::Shard;
@ -64,39 +67,68 @@ struct ExecutionState {
std::string label;
// using CompoundKey = memgraph::coordinator::CompoundKey;
std::optional<CompoundKey> key;
memgraph::coordinator::Hlc transaction_id;
std::vector<TRequest> requests;
};
namespace rsm = memgraph::io::rsm;
// TODO(kostasrim)rename this class template
template <typename TTransport>
template <typename TTransport, typename... Rest>
class QueryEngineMiddleware {
public:
using StorageClient =
memgraph::coordinator::RsmClient<TTransport, rsm::StorageWriteRequest, rsm::StorageWriteResponse,
rsm::StorageReadRequest, rsm::StorageReadResponse>;
memgraph::coordinator::RsmClient<TTransport, rsm::StorageWriteRequest, rsm::StorageWriteResponse, Rest...>;
using CoordinatorClient = memgraph::coordinator::CoordinatorClient<TTransport>;
using Address = memgraph::io::Address;
using Shard = memgraph::coordinator::Shard;
using ShardMap = memgraph::coordinator::ShardMap;
using CompoundKey = memgraph::coordinator::CompoundKey;
using memgraph::io::Io;
QueryEngineMiddleware(CoordinatorClient coord, Io<TTransport> &&io)
QueryEngineMiddleware(CoordinatorClient coord, memgraph::io::Io<TTransport> &&io)
: coord_cli_(std::move(coord)), io_(std::move(io)) {}
std::vector<ScanVerticesResponse> Request(ScanVerticesRequest rqst, ExecutionState &state) {
MaybeUpdateShardMap();
void StartTransaction() {
memgraph::coordinator::HlcRequest req{.last_shard_map_version = shards_map_.GetHlc()};
auto read_res = coord_cli_.SendReadRequest(req);
if (read_res.HasError()) {
throw std::runtime_error("HLC request failed");
}
auto coordinator_read_response = read_res.GetValue();
auto hlc_response = std::get<memgraph::coordinator::HlcResponse>(coordinator_read_response);
// Transaction ID to be used later...
transaction_id_ = hlc_response.new_hlc;
if (hlc_response.fresher_shard_map) {
shards_map_ = hlc_response.fresher_shard_map.value();
} else {
throw std::runtime_error("Should handle gracefully!");
}
}
std::vector<ScanVerticesResponse> Request(ExecutionState<ScanVerticesRequest> &state) {
MaybeUpdateExecutionState(state);
std::vector<ScanVerticesResponse> responses;
for (const auto &shard : *state.state_) {
auto &storage_client = GetStorageClientForShard(state.label, *state.key);
auto read_response_result = storage_client.SendReadRequest(rqst);
auto &state_ref = *state.state_;
size_t id = 0;
for (auto shard_it = state_ref.begin(); shard_it != state_ref.end(); ++id) {
auto &storage_client = GetStorageClientForShard(state.label, state.requests[id].start_id.second);
auto read_response_result = storage_client.SendReadRequest(state.requests[id]);
// RETRY on timeouts?
// Sometimes this produces a timeout. Temporary solution is to use a while(true) as was done in shard_map test
if (read_response_result.HasError()) {
throw std::runtime_error("Handle gracefully!");
throw std::runtime_error("Read request error");
}
if (read_response_result.GetValue().success == false) {
throw std::runtime_error("ReadRequest failed");
}
responses.push_back(read_response_result.GetValue());
if (!read_response_result.GetValue().next_start_id) {
shard_it = state_ref.erase(shard_it);
} else {
state.requests[id].start_id.second = read_response_result.GetValue().next_start_id->second;
++shard_it;
}
responses.push_back(read_response_result.Value());
}
// TODO(kostasrim) Update state accordingly
return responses;
@ -118,61 +150,66 @@ class QueryEngineMiddleware {
// }
}
CreateVerticesResponse Request(CreateVerticesRequest rqst, ExecutionState &state) {
// MaybeUpdateShardMap();
// MaybeUpdateExecutionState();
}
// CreateVerticesResponse Request(CreateVerticesRequest rqst, ExecutionState &state) {
// // MaybeUpdateShardMap();
// // MaybeUpdateExecutionState();
// }
size_t TestRequest(ExecutionState &state) {
MaybeUpdateShardMap();
MaybeUpdateExecutionState(state);
for (auto &st : *state.state_) {
auto &storage_client = GetStorageClientForShard(state.label, *state.key);
rsm::StorageWriteRequest storage_req;
storage_req.key = *state.key;
storage_req.value = 469;
auto write_response_result = storage_client.SendWriteRequest(storage_req);
if (write_response_result.HasError()) {
throw std::runtime_error("Handle gracefully!");
}
auto write_response = write_response_result.GetValue();
bool cas_succeeded = write_response.shard_rsm_success;
if (!cas_succeeded) {
throw std::runtime_error("Handler gracefully!");
}
rsm::StorageReadRequest storage_get_req;
storage_get_req.key = *state.key;
auto get_response_result = storage_client.SendReadRequest(storage_get_req);
if (get_response_result.HasError()) {
throw std::runtime_error("Handler gracefully!");
}
auto get_response = get_response_result.GetValue();
auto val = get_response.value.value();
return val;
}
return 0;
}
// size_t TestRequest(ExecutionState &state) {
// MaybeUpdateShardMap(state);
// MaybeUpdateExecutionState(state);
// for (auto &st : *state.state_) {
// auto &storage_client = GetStorageClientForShard(state.label, *state.key);
//
// memgraph::storage::v3::LabelId label_id = shards_map_.labels.at(state.label);
//
// rsm::StorageWriteRequest storage_req;
// storage_req.label_id = label_id;
// storage_req.transaction_id = state.transaction_id;
// storage_req.key = *state.key;
// storage_req.value = 469;
// auto write_response_result = storage_client.SendWriteRequest(storage_req);
// if (write_response_result.HasError()) {
// throw std::runtime_error("Handle gracefully!");
// }
// auto write_response = write_response_result.GetValue();
//
// bool cas_succeeded = write_response.shard_rsm_success;
//
// if (!cas_succeeded) {
// throw std::runtime_error("Handler gracefully!");
// }
// rsm::StorageReadRequest storage_get_req;
// storage_get_req.key = *state.key;
//
// auto get_response_result = storage_client.SendReadRequest(storage_get_req);
// if (get_response_result.HasError()) {
// throw std::runtime_error("Handler gracefully!");
// }
// auto get_response = get_response_result.GetValue();
// auto val = get_response.value.value();
// return val;
// }
// return 0;
// }
private:
void MaybeUpdateShardMap() {
template <typename TRequest>
void MaybeUpdateShardMap(TRequest &state) {
memgraph::coordinator::HlcRequest req{.last_shard_map_version = shards_map_.GetHlc()};
auto read_res = coord_cli_.SendReadRequest(req);
if (read_res.HasError()) {
// handle error gracefully
// throw some error
throw std::runtime_error("HLC request failed");
}
auto coordinator_read_response = read_res.GetValue();
auto hlc_response = std::get<memgraph::coordinator::HlcResponse>(coordinator_read_response);
if (hlc_response.fresher_shard_map) {
// error here new shard map shouldn't exist
// throw std::runtime_error("Shouldn'");
// error here new shard map shouldn't exist
}
// Transaction ID to be used later...
auto transaction_id = hlc_response.new_hlc;
state.transaction_id = hlc_response.new_hlc;
if (hlc_response.fresher_shard_map) {
shards_map_ = hlc_response.fresher_shard_map.value();
@ -181,12 +218,14 @@ class QueryEngineMiddleware {
}
}
void MaybeUpdateExecutionState(ExecutionState &state) {
template <typename TRequest>
void MaybeUpdateExecutionState(TRequest &state) {
if (state.state_) {
return;
}
state.transaction_id = transaction_id_;
state.state_ = std::make_optional<std::vector<Shard>>();
const auto &shards = shards_map_.shards[state.label];
const auto &shards = shards_map_.shards[shards_map_.labels[state.label]];
if (state.key) {
if (auto it = shards.find(*state.key); it != shards.end()) {
state.state_->push_back(it->second);
@ -200,6 +239,22 @@ class QueryEngineMiddleware {
}
}
void MaybeUpdateExecutionState(ExecutionState<ScanVerticesRequest> &state) {
if (state.state_) {
return;
}
state.transaction_id = transaction_id_;
state.state_ = std::make_optional<std::vector<Shard>>();
const auto &shards = shards_map_.shards[shards_map_.labels[state.label]];
for (const auto &[key, shard] : shards) {
state.state_->push_back(shard);
ScanVerticesRequest rqst;
rqst.transaction_id = transaction_id_;
rqst.start_id.second = key;
state.requests.push_back(std::move(rqst));
}
}
// std::vector<storageclient> GetStorageClientFromShardforRange(const std::string &label, const CompoundKey &start,
// const CompoundKey &end);
StorageClient &GetStorageClientForShard(const std::string &label, const CompoundKey &cm_k) {
@ -225,6 +280,7 @@ class QueryEngineMiddleware {
ShardMap shards_map_;
CoordinatorClient coord_cli_;
RsmStorageClientManager<StorageClient> storage_cli_manager_;
Io<TTransport> io_;
memgraph::io::Io<TTransport> io_;
memgraph::coordinator::Hlc transaction_id_;
// TODO(kostasrim) Add batch prefetching
};

View File

@ -19,18 +19,11 @@
#include <variant>
#include <vector>
#include "coordinator/hybrid_logical_clock.hpp"
#include "storage/v3/id_types.hpp"
#include "storage/v3/property_value.hpp"
/// Hybrid-logical clock
struct Hlc {
uint64_t logical_id;
using Duration = std::chrono::microseconds;
using Time = std::chrono::time_point<std::chrono::system_clock, Duration>;
Time coordinator_wall_clock;
bool operator==(const Hlc &other) const = default;
};
using memgraph::coordinator::Hlc;
struct Label {
size_t id;
@ -81,12 +74,12 @@ struct Value {
bool bool_v;
uint64_t int_v;
double double_v;
std::string string_v;
std::vector<Value> list_v;
std::map<std::string, Value> map_v;
Vertex vertex_v;
Edge edge_v;
Path path_v;
// std::string string_v;
// std::vector<Value> list_v;
// std::map<std::string, Value> map_v;
// Vertex vertex_v;
// Edge edge_v;
// Path path_v;
};
Type type;
@ -125,7 +118,7 @@ enum class StorageView { OLD = 0, NEW = 1 };
struct ScanVerticesRequest {
Hlc transaction_id;
size_t start_id;
VertexId start_id;
std::optional<std::vector<std::string>> props_to_return;
std::optional<std::vector<std::string>> filter_expressions;
std::optional<size_t> batch_limit;

168
tests/simulation/common.hpp Normal file
View File

@ -0,0 +1,168 @@
// Copyright 2022 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
/// The ShardRsm is a simple in-memory raft-backed kv store that can be used for simple testing
/// and implementation of some query engine logic before storage engines are fully implemented.
///
/// To implement multiple read and write commands, change the StorageRead* and StorageWrite* requests
/// and responses to a std::variant of the different options, and route them to specific handlers in
/// the ShardRsm's Read and Apply methods. Remember that Read is called immediately when the Raft
/// leader receives the request, and does not replicate anything over Raft. Apply is called only
/// AFTER the StorageWriteRequest is replicated to a majority of Raft peers, and the result of calling
/// ShardRsm::Apply(StorageWriteRequest) is returned to the client that submitted the request.
#include <algorithm>
#include <deque>
#include <iostream>
#include <map>
#include <optional>
#include <set>
#include <thread>
#include <vector>
#include <iostream>
#include "coordinator/hybrid_logical_clock.hpp"
#include "io/address.hpp"
#include "io/rsm/raft.hpp"
#include "io/rsm/shard_rsm.hpp"
#include "io/simulator/simulator.hpp"
#include "io/simulator/simulator_transport.hpp"
#include "query/v2/requests.hpp"
#include "storage/v3/id_types.hpp"
#include "storage/v3/property_value.hpp"
#include "utils/logging.hpp"
using memgraph::coordinator::Hlc;
using memgraph::io::rsm::StorageWriteRequest;
using memgraph::io::rsm::StorageWriteResponse;
using memgraph::io::simulator::Simulator;
using memgraph::io::simulator::SimulatorConfig;
using memgraph::io::simulator::SimulatorStats;
using memgraph::io::simulator::SimulatorTransport;
using memgraph::storage::v3::LabelId;
using memgraph::storage::v3::PropertyValue;
using ShardRsmKey = std::vector<memgraph::storage::v3::PropertyValue>;
class ShardRsmV2 {
std::map<ShardRsmKey, int> state_;
ShardRsmKey minimum_key_;
std::optional<ShardRsmKey> maximum_key_{std::nullopt};
Hlc shard_map_version_;
// The key is not located in this shard
bool IsKeyInRange(const ShardRsmKey &key) {
if (maximum_key_) [[likely]] {
return (key >= minimum_key_ && key <= maximum_key_);
}
return key >= minimum_key_;
}
public:
// ExpandOneResponse Read(ExpandOneRequest rqst);
// GetPropertiesResponse Read(GetPropertiesRequest rqst);
ScanVerticesResponse Read(ScanVerticesRequest rqst) {
ScanVerticesResponse ret;
if (!IsKeyInRange(rqst.start_id.second)) {
ret.success = false;
} else if (rqst.start_id.second == ShardRsmKey{PropertyValue(0), PropertyValue(0)}) {
Value val{.int_v = 0, .type = Value::Type::INT64};
ListedValues listed_values;
listed_values.properties.push_back(std::vector<Value>{val});
ret.next_start_id = std::make_optional<VertexId>();
ret.next_start_id->second = ShardRsmKey{PropertyValue(1), PropertyValue(0)};
ret.values = std::move(listed_values);
ret.success = true;
} else if (rqst.start_id.second == ShardRsmKey{PropertyValue(1), PropertyValue(0)}) {
Value val{.int_v = 1, .type = Value::Type::INT64};
ListedValues listed_values;
listed_values.properties.push_back(std::vector<Value>{val});
ret.values = std::move(listed_values);
ret.success = true;
} else if (rqst.start_id.second == ShardRsmKey{PropertyValue(12), PropertyValue(13)}) {
Value val{.int_v = 444, .type = Value::Type::INT64};
ListedValues listed_values;
listed_values.properties.push_back(std::vector<Value>{val});
ret.values = std::move(listed_values);
ret.success = true;
} else {
ret.success = false;
}
return ret;
}
// StorageReadResponse Read(StorageReadRequest request) {
// StorageReadResponse ret;
//
// if (!IsKeyInRange(request.key)) {
// ret.latest_known_shard_map_version = shard_map_version_;
// ret.shard_rsm_success = false;
// } else if (state_.contains(request.key)) {
// ret.value = state_[request.key];
// ret.shard_rsm_success = true;
// } else {
// ret.shard_rsm_success = false;
// ret.value = std::nullopt;
// }
// return ret;
// }
//
StorageWriteResponse Apply(StorageWriteRequest request) {
StorageWriteResponse ret;
// Key is outside the prohibited range
if (!IsKeyInRange(request.key)) {
ret.latest_known_shard_map_version = shard_map_version_;
ret.shard_rsm_success = false;
}
// Key exist
else if (state_.contains(request.key)) {
auto &val = state_[request.key];
/*
* Delete
*/
if (!request.value) {
ret.shard_rsm_success = true;
ret.last_value = val;
state_.erase(state_.find(request.key));
}
/*
* Update
*/
// Does old_value match?
if (request.value == val) {
ret.last_value = val;
ret.shard_rsm_success = true;
val = request.value.value();
} else {
ret.last_value = val;
ret.shard_rsm_success = false;
}
}
/*
* Create
*/
else {
ret.last_value = std::nullopt;
ret.shard_rsm_success = true;
state_.emplace(request.key, std::move(request.value).value());
}
return ret;
}
};

View File

@ -18,6 +18,7 @@
#include <thread>
#include <vector>
#include "common.hpp"
#include "coordinator/coordinator_client.hpp"
#include "coordinator/coordinator_rsm.hpp"
#include "io/address.hpp"
@ -28,6 +29,7 @@
#include "io/simulator/simulator.hpp"
#include "io/simulator/simulator_transport.hpp"
#include "query/v2/middleware.hpp"
#include "query/v2/requests.hpp"
#include "storage/v2/property_value.hpp"
#include "storage/v3/property_value.hpp"
#include "utils/result.hpp"
@ -53,9 +55,9 @@ using memgraph::io::rsm::Raft;
using memgraph::io::rsm::ReadRequest;
using memgraph::io::rsm::ReadResponse;
using memgraph::io::rsm::RsmClient;
using memgraph::io::rsm::ShardRsm;
using memgraph::io::rsm::StorageReadRequest;
using memgraph::io::rsm::StorageReadResponse;
using memgraph::io::rsm::StorageRsm;
using memgraph::io::rsm::StorageWriteRequest;
using memgraph::io::rsm::StorageWriteResponse;
using memgraph::io::rsm::WriteRequest;
@ -64,59 +66,63 @@ using memgraph::io::simulator::Simulator;
using memgraph::io::simulator::SimulatorConfig;
using memgraph::io::simulator::SimulatorStats;
using memgraph::io::simulator::SimulatorTransport;
using memgraph::storage::v3::LabelId;
using memgraph::utils::BasicResult;
using StorageClient =
RsmClient<SimulatorTransport, StorageWriteRequest, StorageWriteResponse, StorageReadRequest, StorageReadResponse>;
using ShardClient =
RsmClient<SimulatorTransport, StorageWriteRequest, StorageWriteResponse, ScanVerticesRequest, ScanVerticesResponse>;
namespace {
ShardMap CreateDummyShardmap(memgraph::coordinator::Address a_io_1, memgraph::coordinator::Address a_io_2,
memgraph::coordinator::Address a_io_3, memgraph::coordinator::Address b_io_1,
memgraph::coordinator::Address b_io_2, memgraph::coordinator::Address b_io_3) {
ShardMap sm1;
auto &shards = sm1.GetShards();
static const std::string label_name = std::string("test_label");
ShardMap sm;
// 1
std::string label1 = std::string("label1");
auto key1 = memgraph::storage::v3::PropertyValue(3);
auto key2 = memgraph::storage::v3::PropertyValue(4);
CompoundKey cm1 = {key1, key2};
// register new label space
bool label_success = sm.InitializeNewLabel(label_name, sm.shard_map_version);
MG_ASSERT(label_success);
LabelId label_id = sm.labels.at(label_name);
Shards &shards_for_label = sm.shards.at(label_id);
// add first shard at [0, 0]
AddressAndStatus aas1_1{.address = a_io_1, .status = Status::CONSENSUS_PARTICIPANT};
AddressAndStatus aas1_2{.address = a_io_2, .status = Status::CONSENSUS_PARTICIPANT};
AddressAndStatus aas1_3{.address = a_io_3, .status = Status::CONSENSUS_PARTICIPANT};
Shard shard1 = {aas1_1, aas1_2, aas1_3};
Shards shards1;
shards1[cm1] = shard1;
// 2
std::string label2 = std::string("label2");
auto key3 = memgraph::storage::v3::PropertyValue(12);
auto key4 = memgraph::storage::v3::PropertyValue(13);
CompoundKey cm2 = {key3, key4};
auto key1 = memgraph::storage::v3::PropertyValue(0);
auto key2 = memgraph::storage::v3::PropertyValue(0);
CompoundKey compound_key_1 = {key1, key2};
shards_for_label[compound_key_1] = shard1;
// add second shard at [12, 13]
AddressAndStatus aas2_1{.address = b_io_1, .status = Status::CONSENSUS_PARTICIPANT};
AddressAndStatus aas2_2{.address = b_io_2, .status = Status::CONSENSUS_PARTICIPANT};
AddressAndStatus aas2_3{.address = b_io_3, .status = Status::CONSENSUS_PARTICIPANT};
Shard shard2 = {aas2_1, aas2_2, aas2_3};
Shards shards2;
shards2[cm2] = shard2;
shards[label1] = shards1;
shards[label2] = shards2;
auto key3 = memgraph::storage::v3::PropertyValue(12);
auto key4 = memgraph::storage::v3::PropertyValue(13);
CompoundKey compound_key_2 = {key3, key4};
shards_for_label[compound_key_2] = shard2;
return sm1;
return sm;
}
} // namespace
using ConcreteCoordinatorRsm = CoordinatorRsm<SimulatorTransport>;
using ConcreteStorageRsm = Raft<SimulatorTransport, StorageRsm, StorageWriteRequest, StorageWriteResponse,
StorageReadRequest, StorageReadResponse>;
using ConcreteStorageRsm = Raft<SimulatorTransport, ShardRsmV2, StorageWriteRequest, StorageWriteResponse,
ScanVerticesRequest, ScanVerticesResponse>;
template <typename IoImpl>
void RunStorageRaft(
Raft<IoImpl, StorageRsm, StorageWriteRequest, StorageWriteResponse, StorageReadRequest, StorageReadResponse>
Raft<IoImpl, ShardRsmV2, StorageWriteRequest, StorageWriteResponse, ScanVerticesRequest, ScanVerticesResponse>
server) {
server.Run();
}
@ -159,9 +165,9 @@ int main() {
std::vector<Address> a_2_peers = {a_addrs[0], a_addrs[2]};
std::vector<Address> a_3_peers = {a_addrs[0], a_addrs[1]};
ConcreteStorageRsm a_1{std::move(a_io_1), a_1_peers, StorageRsm{}};
ConcreteStorageRsm a_2{std::move(a_io_2), a_2_peers, StorageRsm{}};
ConcreteStorageRsm a_3{std::move(a_io_3), a_3_peers, StorageRsm{}};
ConcreteStorageRsm a_1{std::move(a_io_1), a_1_peers, ShardRsmV2{}};
ConcreteStorageRsm a_2{std::move(a_io_2), a_2_peers, ShardRsmV2{}};
ConcreteStorageRsm a_3{std::move(a_io_3), a_3_peers, ShardRsmV2{}};
auto a_thread_1 = std::jthread(RunStorageRaft<SimulatorTransport>, std::move(a_1));
simulator.IncrementServerCountAndWaitForQuiescentState(a_addrs[0]);
@ -179,9 +185,9 @@ int main() {
std::vector<Address> b_2_peers = {b_addrs[0], b_addrs[2]};
std::vector<Address> b_3_peers = {b_addrs[0], b_addrs[1]};
ConcreteStorageRsm b_1{std::move(b_io_1), b_1_peers, StorageRsm{}};
ConcreteStorageRsm b_2{std::move(b_io_2), b_2_peers, StorageRsm{}};
ConcreteStorageRsm b_3{std::move(b_io_3), b_3_peers, StorageRsm{}};
ConcreteStorageRsm b_1{std::move(b_io_1), b_1_peers, ShardRsmV2{}};
ConcreteStorageRsm b_2{std::move(b_io_2), b_2_peers, ShardRsmV2{}};
ConcreteStorageRsm b_3{std::move(b_io_3), b_3_peers, ShardRsmV2{}};
auto b_thread_1 = std::jthread(RunStorageRaft<SimulatorTransport>, std::move(b_1));
simulator.IncrementServerCountAndWaitForQuiescentState(b_addrs[0]);
@ -223,16 +229,32 @@ int main() {
// also get the current shard map
CoordinatorClient<SimulatorTransport> coordinator_client(cli_io, c_addrs[0], c_addrs);
QueryEngineMiddleware<SimulatorTransport> io(std::move(cli_io), std::move(coordinator_client));
QueryEngineMiddleware<SimulatorTransport, ScanVerticesRequest, ScanVerticesResponse> io(std::move(coordinator_client),
std::move(cli_io));
ExecutionState state;
ExecutionState<ScanVerticesRequest> state;
state.key = std::make_optional<CompoundKey>(
std::vector{memgraph::storage::v3::PropertyValue(3), memgraph::storage::v3::PropertyValue(4)});
state.label = "label1";
std::vector{memgraph::storage::v3::PropertyValue(0), memgraph::storage::v3::PropertyValue(0)});
state.label = "test_label";
// auto result = io.TestRequest(state);
io.StartTransaction();
auto result = io.Request(state);
simulator.ShutDown();
std::cout << "Result is: " << result;
auto &list_of_values = std::get<ListedValues>(result[0].values);
std::cout << "Result is: " << list_of_values.properties[0][0].int_v << std::endl;
auto &list_of_values_v2 = std::get<ListedValues>(result[1].values);
std::cout << "Result is: " << list_of_values_v2.properties[0][0].int_v << std::endl;
result = io.Request(state);
std::cout << "Result is: " << result.size() << std::endl;
// auto &list_of_values2 = std::get<ListedValues>(result[0].values);
// std::cout << "Result is: " << list_of_values2.properties[0][0].int_v << std::endl;
// exhaust it
result = io.Request(state);
std::cout << "Result is: " << result.size() << std::endl;
simulator.ShutDown();
return 0;
}

View File

@ -27,6 +27,7 @@
#include "io/rsm/shard_rsm.hpp"
#include "io/simulator/simulator.hpp"
#include "io/simulator/simulator_transport.hpp"
#include "query/v2/requests.hpp"
#include "storage/v3/id_types.hpp"
#include "utils/result.hpp"
@ -128,11 +129,12 @@ std::optional<ShardClient> DetermineShardLocation(Shard target_shard, const std:
using ConcreteCoordinatorRsm = CoordinatorRsm<SimulatorTransport>;
using ConcreteShardRsm = Raft<SimulatorTransport, ShardRsm, StorageWriteRequest, StorageWriteResponse,
StorageReadRequest, StorageReadResponse>;
ScanVerticesRequest, ScanVerticesResponse>;
template <typename IoImpl>
void RunStorageRaft(
Raft<IoImpl, ShardRsm, StorageWriteRequest, StorageWriteResponse, StorageReadRequest, StorageReadResponse> server) {
Raft<IoImpl, ShardRsm, StorageWriteRequest, StorageWriteResponse, ScanVerticesRequest, ScanVerticesResponse>
server) {
server.Run();
}