Stitch request manager with shard (#570)

Fix various bugs
This commit is contained in:
Tyler Neely 2022-09-23 20:07:41 +02:00 committed by GitHub
parent 925835b080
commit b5c7078c7d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 81 additions and 78 deletions

View File

@ -158,10 +158,10 @@ class Coordinator {
private:
ShardMap shard_map_;
uint64_t highest_allocated_timestamp_;
uint64_t highest_allocated_timestamp_{0};
/// Query engines need to periodically request batches of unique edge IDs.
uint64_t highest_allocated_edge_id_;
uint64_t highest_allocated_edge_id_{0};
CoordinatorReadResponses HandleRead(GetShardMapRequest && /* get_shard_map_request */) {
GetShardMapResponse res;

View File

@ -21,8 +21,8 @@ using Time = memgraph::io::Time;
/// Hybrid-logical clock
struct Hlc {
uint64_t logical_id;
Time coordinator_wall_clock;
uint64_t logical_id = 0;
Time coordinator_wall_clock = Time::min();
auto operator<=>(const Hlc &other) const { return logical_id <=> other.logical_id; }

View File

@ -30,6 +30,8 @@
namespace memgraph::coordinator {
constexpr int64_t kNotExistingId{0};
using memgraph::io::Address;
using memgraph::storage::v3::Config;
using memgraph::storage::v3::LabelId;
@ -63,6 +65,7 @@ struct ShardToInitialize {
LabelId label_id;
PrimaryKey min_key;
std::optional<PrimaryKey> max_key;
std::vector<SchemaProperty> schema;
Config config;
};
@ -76,9 +79,9 @@ struct LabelSpace {
struct ShardMap {
Hlc shard_map_version;
uint64_t max_property_id;
uint64_t max_property_id{kNotExistingId};
std::map<PropertyName, PropertyId> properties;
uint64_t max_label_id;
uint64_t max_label_id{kNotExistingId};
std::map<LabelName, LabelId> labels;
std::map<LabelId, LabelSpace> label_spaces;
std::map<LabelId, std::vector<SchemaProperty>> schemas;
@ -124,6 +127,7 @@ struct ShardMap {
.label_id = label_id,
.min_key = low_key,
.max_key = std::nullopt,
.schema = label_space.schema,
.config = Config{},
});
}

View File

@ -156,7 +156,7 @@ class MachineManager {
AppendResponse, WriteRequest<StorageWriteRequest>, VoteRequest, VoteResponse>(
std::move(request_envelope.message));
MG_ASSERT(conversion_attempt.has_value(), "shard rsm message conversion failed for {}",
MG_ASSERT(conversion_attempt.has_value(), "shard rsm message conversion failed for {} - incorrect message type",
request_envelope.to_address.ToString());
spdlog::info("got shard rsm message");

View File

@ -382,7 +382,7 @@ struct ScanVerticesRequest {
std::optional<std::vector<PropertyId>> props_to_return;
std::optional<std::vector<std::string>> filter_expressions;
std::optional<size_t> batch_limit;
StorageView storage_view;
StorageView storage_view{StorageView::NEW};
};
struct ScanResultRow {
@ -492,7 +492,6 @@ struct NewVertexLabel {
};
struct CreateVerticesRequest {
std::string label;
Hlc transaction_id;
std::vector<NewVertex> new_vertices;
};

View File

@ -23,6 +23,7 @@
#include <unordered_map>
#include <vector>
#include "coordinator/coordinator.hpp"
#include "coordinator/coordinator_client.hpp"
#include "coordinator/coordinator_rsm.hpp"
#include "coordinator/shard_map.hpp"
@ -117,12 +118,9 @@ class ShardRequestManagerInterface {
template <typename TTransport>
class ShardRequestManager : public ShardRequestManagerInterface {
public:
using WriteRequests = CreateVerticesRequest;
using WriteResponses = CreateVerticesResponse;
using ReadRequests = std::variant<ScanVerticesRequest, ExpandOneRequest>;
using ReadResponses = std::variant<ScanVerticesResponse, ExpandOneResponse>;
using StorageClient =
memgraph::coordinator::RsmClient<TTransport, WriteRequests, WriteResponses, ReadRequests, ReadResponses>;
using CoordinatorWriteRequests = memgraph::coordinator::CoordinatorWriteRequests;
using CoordinatorClient = memgraph::coordinator::CoordinatorClient<TTransport>;
using Address = memgraph::io::Address;
using Shard = memgraph::coordinator::Shard;
@ -141,7 +139,8 @@ class ShardRequestManager : public ShardRequestManagerInterface {
void StartTransaction() override {
memgraph::coordinator::HlcRequest req{.last_shard_map_version = shards_map_.GetHlc()};
auto write_res = coord_cli_.SendWriteRequest(req);
CoordinatorWriteRequests write_req = req;
auto write_res = coord_cli_.SendWriteRequest(write_req);
if (write_res.HasError()) {
throw std::runtime_error("HLC request failed");
}
@ -180,13 +179,15 @@ class ShardRequestManager : public ShardRequestManagerInterface {
*state.label, storage::conversions::ConvertPropertyVector(state.requests[id].start_id.second));
// TODO(kostasrim) Currently requests return the result directly. Adjust this when the API works MgFuture
// instead.
auto read_response_result = storage_client.SendReadRequest(state.requests[id]);
ReadRequests req = state.requests[id];
auto read_response_result = storage_client.SendReadRequest(req);
// RETRY on timeouts?
// Sometimes this produces a timeout. Temporary solution is to use a while(true) as was done in shard_map test
if (read_response_result.HasError()) {
throw std::runtime_error("ScanAll request timedout");
}
auto &response = std::get<ScanVerticesResponse>(read_response_result.GetValue());
ReadResponses read_response_variant = read_response_result.GetValue();
auto &response = std::get<ScanVerticesResponse>(read_response_variant);
if (!response.success) {
throw std::runtime_error("ScanAll request did not succeed");
}
@ -215,18 +216,26 @@ class ShardRequestManager : public ShardRequestManagerInterface {
for (auto shard_it = shard_cache_ref.begin(); shard_it != shard_cache_ref.end(); ++id) {
// This is fine because all new_vertices of each request end up on the same shard
const auto labels = state.requests[id].new_vertices[0].label_ids;
for (auto &new_vertex : state.requests[id].new_vertices) {
new_vertex.label_ids.erase(new_vertex.label_ids.begin());
}
auto primary_key = state.requests[id].new_vertices[0].primary_key;
auto &storage_client = GetStorageClientForShard(*shard_it, labels[0].id);
auto write_response_result = storage_client.SendWriteRequest(state.requests[id]);
WriteRequests req = state.requests[id];
auto ladaksd = std::get<CreateVerticesRequest>(req);
auto write_response_result = storage_client.SendWriteRequest(req);
// RETRY on timeouts?
// Sometimes this produces a timeout. Temporary solution is to use a while(true) as was done in shard_map test
if (write_response_result.HasError()) {
throw std::runtime_error("CreateVertices request timedout");
}
if (!write_response_result.GetValue().success) {
WriteResponses response_variant = write_response_result.GetValue();
CreateVerticesResponse mapped_response = std::get<CreateVerticesResponse>(response_variant);
if (!mapped_response.success) {
throw std::runtime_error("CreateVertices request did not succeed");
}
responses.push_back(write_response_result.GetValue());
responses.push_back(mapped_response);
shard_it = shard_cache_ref.erase(shard_it);
}
// We are done with this state
@ -250,7 +259,8 @@ class ShardRequestManager : public ShardRequestManagerInterface {
for (auto shard_it = shard_cache_ref.begin(); shard_it != shard_cache_ref.end(); ++id) {
const Label primary_label = state.requests[id].src_vertices[0].first;
auto &storage_client = GetStorageClientForShard(*shard_it, primary_label.id);
auto read_response_result = storage_client.SendReadRequest(state.requests[id]);
ReadRequests req = state.requests[id];
auto read_response_result = storage_client.SendReadRequest(req);
// RETRY on timeouts?
// Sometimes this produces a timeout. Temporary solution is to use a while(true) as was done in shard_map
if (read_response_result.HasError()) {
@ -303,6 +313,7 @@ class ShardRequestManager : public ShardRequestManagerInterface {
std::map<Shard, CreateVerticesRequest> per_shard_request_table;
for (auto &new_vertex : new_vertices) {
MG_ASSERT(!new_vertex.label_ids.empty(), "This is error!");
auto shard = shards_map_.GetShardForKey(new_vertex.label_ids[0].id,
storage::conversions::ConvertPropertyVector(new_vertex.primary_key));
if (!per_shard_request_table.contains(shard)) {

View File

@ -320,7 +320,7 @@ bool VerticesIterable::Iterator::operator==(const Iterator &other) const {
}
Shard::Shard(const LabelId primary_label, const PrimaryKey min_primary_key,
const std::optional<PrimaryKey> max_primary_key, Config config)
const std::optional<PrimaryKey> max_primary_key, std::vector<SchemaProperty> schema, Config config)
: primary_label_{primary_label},
min_primary_key_{min_primary_key},
max_primary_key_{max_primary_key},
@ -331,7 +331,9 @@ Shard::Shard(const LabelId primary_label, const PrimaryKey min_primary_key,
config_{config},
uuid_{utils::GenerateUUID()},
epoch_id_{utils::GenerateUUID()},
global_locker_{file_retainer_.AddLocker()} {}
global_locker_{file_retainer_.AddLocker()} {
CreateSchema(primary_label_, schema);
}
Shard::~Shard() {}

View File

@ -190,7 +190,7 @@ class Shard final {
/// @throw std::system_error
/// @throw std::bad_alloc
explicit Shard(LabelId primary_label, PrimaryKey min_primary_key, std::optional<PrimaryKey> max_primary_key,
Config config = Config());
std::vector<SchemaProperty> schema, Config config = Config());
Shard(const Shard &) = delete;
Shard(Shard &&) noexcept = delete;

View File

@ -211,7 +211,7 @@ class ShardManager {
std::vector<Address> rsm_peers = {};
std::unique_ptr<Shard> shard =
std::make_unique<Shard>(to_init.label_id, to_init.min_key, to_init.max_key, to_init.config);
std::make_unique<Shard>(to_init.label_id, to_init.min_key, to_init.max_key, to_init.schema, to_init.config);
ShardRsm rsm_state{std::move(shard)};

View File

@ -432,4 +432,4 @@ target_link_libraries(${test_prefix}local_transport mg-io)
# Test MachineManager with LocalTransport
add_unit_test(machine_manager.cpp)
target_link_libraries(${test_prefix}machine_manager mg-io mg-coordinator mg-storage-v3)
target_link_libraries(${test_prefix}machine_manager mg-io mg-coordinator mg-storage-v3 mg-query-v2)

View File

@ -27,6 +27,7 @@
#include <machine_manager/machine_manager.hpp>
#include <query/v2/requests.hpp>
#include "io/rsm/rsm_client.hpp"
#include "query/v2/shard_request_manager.hpp"
#include "storage/v3/id_types.hpp"
#include "storage/v3/schemas.hpp"
@ -105,6 +106,37 @@ ShardMap TestShardMap() {
return sm;
}
template <typename ShardRequestManager>
void TestScanAll(ShardRequestManager &shard_request_manager) {
msgs::ExecutionState<msgs::ScanVerticesRequest> state{.label = "test_label"};
auto result = shard_request_manager.Request(state);
MG_ASSERT(result.size() == 2, "{}", result.size());
}
template <typename ShardRequestManager>
void TestCreateVertices(ShardRequestManager &shard_request_manager) {
using PropVal = msgs::Value;
msgs::ExecutionState<msgs::CreateVerticesRequest> state;
std::vector<msgs::NewVertex> new_vertices;
auto label_id = shard_request_manager.LabelNameToLabelId("test_label");
msgs::NewVertex a1{.primary_key = {PropVal(int64_t(0)), PropVal(int64_t(0))}};
a1.label_ids.push_back({label_id});
msgs::NewVertex a2{.primary_key = {PropVal(int64_t(13)), PropVal(int64_t(13))}};
a2.label_ids.push_back({label_id});
new_vertices.push_back(std::move(a1));
new_vertices.push_back(std::move(a2));
auto result = shard_request_manager.Request(state, std::move(new_vertices));
MG_ASSERT(result.size() == 1);
}
template <typename ShardRequestManager>
void TestExpand(ShardRequestManager &shard_request_manager) {}
template <typename ShardRequestManager>
void TestAggregate(ShardRequestManager &shard_request_manager) {}
MachineManager<LocalTransport> MkMm(LocalSystem &local_system, std::vector<Address> coordinator_addresses, Address addr,
ShardMap shard_map) {
MachineConfig config{
@ -128,6 +160,9 @@ void WaitForShardsToInitialize(CoordinatorClient<LocalTransport> &cc) {
// TODO(tyler) call coordinator client's read method for GetShardMap
// and keep reading it until the shard map contains proper replicas
// for each shard in the label space.
using namespace std::chrono_literals;
std::this_thread::sleep_for(2010ms);
}
TEST(MachineManager, BasicFunctionality) {
@ -154,61 +189,13 @@ TEST(MachineManager, BasicFunctionality) {
WaitForShardsToInitialize(cc);
using namespace std::chrono_literals;
std::this_thread::sleep_for(2010ms);
CoordinatorClient<LocalTransport> coordinator_client(cli_io, coordinator_address, {coordinator_address});
// get ShardMap from coordinator
memgraph::coordinator::HlcRequest req;
req.last_shard_map_version = Hlc{
.logical_id = 0,
};
BasicResult<TimedOut, memgraph::coordinator::CoordinatorWriteResponses> read_res = cc.SendWriteRequest(req);
MG_ASSERT(!read_res.HasError(), "HLC request unexpectedly timed out");
auto coordinator_read_response = read_res.GetValue();
HlcResponse hlc_response = std::get<HlcResponse>(coordinator_read_response);
ShardMap sm = hlc_response.fresher_shard_map.value();
// Get shard for key and create rsm client
const auto cm_key_1 = memgraph::storage::v3::PropertyValue(3);
const auto cm_key_2 = memgraph::storage::v3::PropertyValue(4);
const CompoundKey compound_key = {cm_key_1, cm_key_2};
std::string label_name = "test_label";
Shard shard_for_key = sm.GetShardForKey(label_name, compound_key);
auto shard_for_client = std::vector<Address>{};
for (const auto &aas : shard_for_key) {
spdlog::info("got address for shard: {}", aas.address.ToString());
shard_for_client.push_back(aas.address);
}
ShardClient shard_client{cli_io, shard_for_client[0], shard_for_client};
// submit a read request and assert that the requested key does not yet exist
LabelId label_id = sm.labels.at(label_name);
ReadRequests storage_get_req;
/*
TODO(tyler,kostas) set this to a real request
storage_get_req.label_id = label_id;
storage_get_req.key = compound_key;
storage_get_req.transaction_id = hlc_response.new_hlc;
*/
auto get_response_result = shard_client.SendReadRequest(storage_get_req);
auto get_response = get_response_result.GetValue();
/*
auto val = get_response.value;
MG_ASSERT(!val.has_value());
*/
msgs::ShardRequestManager<LocalTransport> shard_request_manager(std::move(coordinator_client), std::move(cli_io));
shard_request_manager.StartTransaction();
TestCreateVertices(shard_request_manager);
TestScanAll(shard_request_manager);
local_system.ShutDown();
};