From b5c7078c7d8cc985f2d8085b0e336dd99e47c306 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Fri, 23 Sep 2022 20:07:41 +0200 Subject: [PATCH] Stitch request manager with shard (#570) Fix various bugs --- src/coordinator/coordinator.hpp | 4 +- src/coordinator/hybrid_logical_clock.hpp | 4 +- src/coordinator/shard_map.hpp | 8 +- src/machine_manager/machine_manager.hpp | 2 +- src/query/v2/requests.hpp | 3 +- src/query/v2/shard_request_manager.hpp | 33 ++++++--- src/storage/v3/shard.cpp | 6 +- src/storage/v3/shard.hpp | 2 +- src/storage/v3/shard_manager.hpp | 2 +- tests/unit/CMakeLists.txt | 2 +- tests/unit/machine_manager.cpp | 93 ++++++++++-------------- 11 files changed, 81 insertions(+), 78 deletions(-) diff --git a/src/coordinator/coordinator.hpp b/src/coordinator/coordinator.hpp index 31e20bb9f..1a813d90b 100644 --- a/src/coordinator/coordinator.hpp +++ b/src/coordinator/coordinator.hpp @@ -158,10 +158,10 @@ class Coordinator { private: ShardMap shard_map_; - uint64_t highest_allocated_timestamp_; + uint64_t highest_allocated_timestamp_{0}; /// Query engines need to periodically request batches of unique edge IDs. - uint64_t highest_allocated_edge_id_; + uint64_t highest_allocated_edge_id_{0}; CoordinatorReadResponses HandleRead(GetShardMapRequest && /* get_shard_map_request */) { GetShardMapResponse res; diff --git a/src/coordinator/hybrid_logical_clock.hpp b/src/coordinator/hybrid_logical_clock.hpp index caea92425..04f6eba26 100644 --- a/src/coordinator/hybrid_logical_clock.hpp +++ b/src/coordinator/hybrid_logical_clock.hpp @@ -21,8 +21,8 @@ using Time = memgraph::io::Time; /// Hybrid-logical clock struct Hlc { - uint64_t logical_id; - Time coordinator_wall_clock; + uint64_t logical_id = 0; + Time coordinator_wall_clock = Time::min(); auto operator<=>(const Hlc &other) const { return logical_id <=> other.logical_id; } diff --git a/src/coordinator/shard_map.hpp b/src/coordinator/shard_map.hpp index 44759fcca..b416ea330 100644 --- a/src/coordinator/shard_map.hpp +++ b/src/coordinator/shard_map.hpp @@ -30,6 +30,8 @@ namespace memgraph::coordinator { +constexpr int64_t kNotExistingId{0}; + using memgraph::io::Address; using memgraph::storage::v3::Config; using memgraph::storage::v3::LabelId; @@ -63,6 +65,7 @@ struct ShardToInitialize { LabelId label_id; PrimaryKey min_key; std::optional max_key; + std::vector schema; Config config; }; @@ -76,9 +79,9 @@ struct LabelSpace { struct ShardMap { Hlc shard_map_version; - uint64_t max_property_id; + uint64_t max_property_id{kNotExistingId}; std::map properties; - uint64_t max_label_id; + uint64_t max_label_id{kNotExistingId}; std::map labels; std::map label_spaces; std::map> schemas; @@ -124,6 +127,7 @@ struct ShardMap { .label_id = label_id, .min_key = low_key, .max_key = std::nullopt, + .schema = label_space.schema, .config = Config{}, }); } diff --git a/src/machine_manager/machine_manager.hpp b/src/machine_manager/machine_manager.hpp index fe18b18b2..5be419189 100644 --- a/src/machine_manager/machine_manager.hpp +++ b/src/machine_manager/machine_manager.hpp @@ -156,7 +156,7 @@ class MachineManager { AppendResponse, WriteRequest, VoteRequest, VoteResponse>( std::move(request_envelope.message)); - MG_ASSERT(conversion_attempt.has_value(), "shard rsm message conversion failed for {}", + MG_ASSERT(conversion_attempt.has_value(), "shard rsm message conversion failed for {} - incorrect message type", request_envelope.to_address.ToString()); spdlog::info("got shard rsm message"); diff --git a/src/query/v2/requests.hpp b/src/query/v2/requests.hpp index 87a8930d8..476634701 100644 --- a/src/query/v2/requests.hpp +++ b/src/query/v2/requests.hpp @@ -382,7 +382,7 @@ struct ScanVerticesRequest { std::optional> props_to_return; std::optional> filter_expressions; std::optional batch_limit; - StorageView storage_view; + StorageView storage_view{StorageView::NEW}; }; struct ScanResultRow { @@ -492,7 +492,6 @@ struct NewVertexLabel { }; struct CreateVerticesRequest { - std::string label; Hlc transaction_id; std::vector new_vertices; }; diff --git a/src/query/v2/shard_request_manager.hpp b/src/query/v2/shard_request_manager.hpp index 2fb8f685f..e0d7133d2 100644 --- a/src/query/v2/shard_request_manager.hpp +++ b/src/query/v2/shard_request_manager.hpp @@ -23,6 +23,7 @@ #include #include +#include "coordinator/coordinator.hpp" #include "coordinator/coordinator_client.hpp" #include "coordinator/coordinator_rsm.hpp" #include "coordinator/shard_map.hpp" @@ -117,12 +118,9 @@ class ShardRequestManagerInterface { template class ShardRequestManager : public ShardRequestManagerInterface { public: - using WriteRequests = CreateVerticesRequest; - using WriteResponses = CreateVerticesResponse; - using ReadRequests = std::variant; - using ReadResponses = std::variant; using StorageClient = memgraph::coordinator::RsmClient; + using CoordinatorWriteRequests = memgraph::coordinator::CoordinatorWriteRequests; using CoordinatorClient = memgraph::coordinator::CoordinatorClient; using Address = memgraph::io::Address; using Shard = memgraph::coordinator::Shard; @@ -141,7 +139,8 @@ class ShardRequestManager : public ShardRequestManagerInterface { void StartTransaction() override { memgraph::coordinator::HlcRequest req{.last_shard_map_version = shards_map_.GetHlc()}; - auto write_res = coord_cli_.SendWriteRequest(req); + CoordinatorWriteRequests write_req = req; + auto write_res = coord_cli_.SendWriteRequest(write_req); if (write_res.HasError()) { throw std::runtime_error("HLC request failed"); } @@ -180,13 +179,15 @@ class ShardRequestManager : public ShardRequestManagerInterface { *state.label, storage::conversions::ConvertPropertyVector(state.requests[id].start_id.second)); // TODO(kostasrim) Currently requests return the result directly. Adjust this when the API works MgFuture // instead. - auto read_response_result = storage_client.SendReadRequest(state.requests[id]); + ReadRequests req = state.requests[id]; + auto read_response_result = storage_client.SendReadRequest(req); // RETRY on timeouts? // Sometimes this produces a timeout. Temporary solution is to use a while(true) as was done in shard_map test if (read_response_result.HasError()) { throw std::runtime_error("ScanAll request timedout"); } - auto &response = std::get(read_response_result.GetValue()); + ReadResponses read_response_variant = read_response_result.GetValue(); + auto &response = std::get(read_response_variant); if (!response.success) { throw std::runtime_error("ScanAll request did not succeed"); } @@ -215,18 +216,26 @@ class ShardRequestManager : public ShardRequestManagerInterface { for (auto shard_it = shard_cache_ref.begin(); shard_it != shard_cache_ref.end(); ++id) { // This is fine because all new_vertices of each request end up on the same shard const auto labels = state.requests[id].new_vertices[0].label_ids; + for (auto &new_vertex : state.requests[id].new_vertices) { + new_vertex.label_ids.erase(new_vertex.label_ids.begin()); + } auto primary_key = state.requests[id].new_vertices[0].primary_key; auto &storage_client = GetStorageClientForShard(*shard_it, labels[0].id); - auto write_response_result = storage_client.SendWriteRequest(state.requests[id]); + WriteRequests req = state.requests[id]; + auto ladaksd = std::get(req); + auto write_response_result = storage_client.SendWriteRequest(req); // RETRY on timeouts? // Sometimes this produces a timeout. Temporary solution is to use a while(true) as was done in shard_map test if (write_response_result.HasError()) { throw std::runtime_error("CreateVertices request timedout"); } - if (!write_response_result.GetValue().success) { + WriteResponses response_variant = write_response_result.GetValue(); + CreateVerticesResponse mapped_response = std::get(response_variant); + + if (!mapped_response.success) { throw std::runtime_error("CreateVertices request did not succeed"); } - responses.push_back(write_response_result.GetValue()); + responses.push_back(mapped_response); shard_it = shard_cache_ref.erase(shard_it); } // We are done with this state @@ -250,7 +259,8 @@ class ShardRequestManager : public ShardRequestManagerInterface { for (auto shard_it = shard_cache_ref.begin(); shard_it != shard_cache_ref.end(); ++id) { const Label primary_label = state.requests[id].src_vertices[0].first; auto &storage_client = GetStorageClientForShard(*shard_it, primary_label.id); - auto read_response_result = storage_client.SendReadRequest(state.requests[id]); + ReadRequests req = state.requests[id]; + auto read_response_result = storage_client.SendReadRequest(req); // RETRY on timeouts? // Sometimes this produces a timeout. Temporary solution is to use a while(true) as was done in shard_map if (read_response_result.HasError()) { @@ -303,6 +313,7 @@ class ShardRequestManager : public ShardRequestManagerInterface { std::map per_shard_request_table; for (auto &new_vertex : new_vertices) { + MG_ASSERT(!new_vertex.label_ids.empty(), "This is error!"); auto shard = shards_map_.GetShardForKey(new_vertex.label_ids[0].id, storage::conversions::ConvertPropertyVector(new_vertex.primary_key)); if (!per_shard_request_table.contains(shard)) { diff --git a/src/storage/v3/shard.cpp b/src/storage/v3/shard.cpp index 94aff2b62..083455a25 100644 --- a/src/storage/v3/shard.cpp +++ b/src/storage/v3/shard.cpp @@ -320,7 +320,7 @@ bool VerticesIterable::Iterator::operator==(const Iterator &other) const { } Shard::Shard(const LabelId primary_label, const PrimaryKey min_primary_key, - const std::optional max_primary_key, Config config) + const std::optional max_primary_key, std::vector schema, Config config) : primary_label_{primary_label}, min_primary_key_{min_primary_key}, max_primary_key_{max_primary_key}, @@ -331,7 +331,9 @@ Shard::Shard(const LabelId primary_label, const PrimaryKey min_primary_key, config_{config}, uuid_{utils::GenerateUUID()}, epoch_id_{utils::GenerateUUID()}, - global_locker_{file_retainer_.AddLocker()} {} + global_locker_{file_retainer_.AddLocker()} { + CreateSchema(primary_label_, schema); +} Shard::~Shard() {} diff --git a/src/storage/v3/shard.hpp b/src/storage/v3/shard.hpp index 8ebccf5c2..4b0b9d93c 100644 --- a/src/storage/v3/shard.hpp +++ b/src/storage/v3/shard.hpp @@ -190,7 +190,7 @@ class Shard final { /// @throw std::system_error /// @throw std::bad_alloc explicit Shard(LabelId primary_label, PrimaryKey min_primary_key, std::optional max_primary_key, - Config config = Config()); + std::vector schema, Config config = Config()); Shard(const Shard &) = delete; Shard(Shard &&) noexcept = delete; diff --git a/src/storage/v3/shard_manager.hpp b/src/storage/v3/shard_manager.hpp index 6288c011d..e682646e0 100644 --- a/src/storage/v3/shard_manager.hpp +++ b/src/storage/v3/shard_manager.hpp @@ -211,7 +211,7 @@ class ShardManager { std::vector
rsm_peers = {}; std::unique_ptr shard = - std::make_unique(to_init.label_id, to_init.min_key, to_init.max_key, to_init.config); + std::make_unique(to_init.label_id, to_init.min_key, to_init.max_key, to_init.schema, to_init.config); ShardRsm rsm_state{std::move(shard)}; diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index 028eed26a..24a61a21c 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -432,4 +432,4 @@ target_link_libraries(${test_prefix}local_transport mg-io) # Test MachineManager with LocalTransport add_unit_test(machine_manager.cpp) -target_link_libraries(${test_prefix}machine_manager mg-io mg-coordinator mg-storage-v3) +target_link_libraries(${test_prefix}machine_manager mg-io mg-coordinator mg-storage-v3 mg-query-v2) diff --git a/tests/unit/machine_manager.cpp b/tests/unit/machine_manager.cpp index 93f2cef23..118e9d34b 100644 --- a/tests/unit/machine_manager.cpp +++ b/tests/unit/machine_manager.cpp @@ -27,6 +27,7 @@ #include #include #include "io/rsm/rsm_client.hpp" +#include "query/v2/shard_request_manager.hpp" #include "storage/v3/id_types.hpp" #include "storage/v3/schemas.hpp" @@ -105,6 +106,37 @@ ShardMap TestShardMap() { return sm; } +template +void TestScanAll(ShardRequestManager &shard_request_manager) { + msgs::ExecutionState state{.label = "test_label"}; + + auto result = shard_request_manager.Request(state); + MG_ASSERT(result.size() == 2, "{}", result.size()); +} + +template +void TestCreateVertices(ShardRequestManager &shard_request_manager) { + using PropVal = msgs::Value; + msgs::ExecutionState state; + std::vector new_vertices; + auto label_id = shard_request_manager.LabelNameToLabelId("test_label"); + msgs::NewVertex a1{.primary_key = {PropVal(int64_t(0)), PropVal(int64_t(0))}}; + a1.label_ids.push_back({label_id}); + msgs::NewVertex a2{.primary_key = {PropVal(int64_t(13)), PropVal(int64_t(13))}}; + a2.label_ids.push_back({label_id}); + new_vertices.push_back(std::move(a1)); + new_vertices.push_back(std::move(a2)); + + auto result = shard_request_manager.Request(state, std::move(new_vertices)); + MG_ASSERT(result.size() == 1); +} + +template +void TestExpand(ShardRequestManager &shard_request_manager) {} + +template +void TestAggregate(ShardRequestManager &shard_request_manager) {} + MachineManager MkMm(LocalSystem &local_system, std::vector
coordinator_addresses, Address addr, ShardMap shard_map) { MachineConfig config{ @@ -128,6 +160,9 @@ void WaitForShardsToInitialize(CoordinatorClient &cc) { // TODO(tyler) call coordinator client's read method for GetShardMap // and keep reading it until the shard map contains proper replicas // for each shard in the label space. + + using namespace std::chrono_literals; + std::this_thread::sleep_for(2010ms); } TEST(MachineManager, BasicFunctionality) { @@ -154,61 +189,13 @@ TEST(MachineManager, BasicFunctionality) { WaitForShardsToInitialize(cc); - using namespace std::chrono_literals; - std::this_thread::sleep_for(2010ms); + CoordinatorClient coordinator_client(cli_io, coordinator_address, {coordinator_address}); - // get ShardMap from coordinator - memgraph::coordinator::HlcRequest req; - req.last_shard_map_version = Hlc{ - .logical_id = 0, - }; - - BasicResult read_res = cc.SendWriteRequest(req); - MG_ASSERT(!read_res.HasError(), "HLC request unexpectedly timed out"); - - auto coordinator_read_response = read_res.GetValue(); - HlcResponse hlc_response = std::get(coordinator_read_response); - ShardMap sm = hlc_response.fresher_shard_map.value(); - - // Get shard for key and create rsm client - const auto cm_key_1 = memgraph::storage::v3::PropertyValue(3); - const auto cm_key_2 = memgraph::storage::v3::PropertyValue(4); - - const CompoundKey compound_key = {cm_key_1, cm_key_2}; - - std::string label_name = "test_label"; - - Shard shard_for_key = sm.GetShardForKey(label_name, compound_key); - - auto shard_for_client = std::vector
{}; - - for (const auto &aas : shard_for_key) { - spdlog::info("got address for shard: {}", aas.address.ToString()); - shard_for_client.push_back(aas.address); - } - - ShardClient shard_client{cli_io, shard_for_client[0], shard_for_client}; - - // submit a read request and assert that the requested key does not yet exist - - LabelId label_id = sm.labels.at(label_name); - - ReadRequests storage_get_req; - /* - TODO(tyler,kostas) set this to a real request - storage_get_req.label_id = label_id; - storage_get_req.key = compound_key; - storage_get_req.transaction_id = hlc_response.new_hlc; - */ - - auto get_response_result = shard_client.SendReadRequest(storage_get_req); - auto get_response = get_response_result.GetValue(); - /* - auto val = get_response.value; - - MG_ASSERT(!val.has_value()); - */ + msgs::ShardRequestManager shard_request_manager(std::move(coordinator_client), std::move(cli_io)); + shard_request_manager.StartTransaction(); + TestCreateVertices(shard_request_manager); + TestScanAll(shard_request_manager); local_system.ShutDown(); };