From 332afadf21b13e48ffa87595b93b943eeebe96f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A1nos=20Benjamin=20Antal?= <antaljanosbenjamin@users.noreply.github.com> Date: Tue, 25 Oct 2022 10:27:13 +0200 Subject: [PATCH 1/2] Split file parsing (#600) Add temporary support for split files. This is only temporary solution until we get the shard splitting implemented. --- src/coordinator/shard_map.cpp | 433 +++++++++++++++++++++++++++ src/coordinator/shard_map.hpp | 306 ++----------------- src/memgraph.cpp | 28 +- src/query/v2/CMakeLists.txt | 2 +- tests/simulation/shard_rsm.cpp | 1 - tests/unit/CMakeLists.txt | 45 +-- tests/unit/coordinator_shard_map.cpp | 104 +++++++ 7 files changed, 604 insertions(+), 315 deletions(-) create mode 100644 tests/unit/coordinator_shard_map.cpp diff --git a/src/coordinator/shard_map.cpp b/src/coordinator/shard_map.cpp index 88f878b4e..87b449301 100644 --- a/src/coordinator/shard_map.cpp +++ b/src/coordinator/shard_map.cpp @@ -9,8 +9,17 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. +#include <unordered_map> +#include <vector> + +#include "common/types.hpp" #include "coordinator/shard_map.hpp" +#include "spdlog/spdlog.h" +#include "storage/v3/schemas.hpp" #include "storage/v3/temporal.hpp" +#include "utils/cast.hpp" +#include "utils/exceptions.hpp" +#include "utils/string.hpp" namespace memgraph::coordinator { @@ -57,6 +66,259 @@ PrimaryKey SchemaToMinKey(const std::vector<SchemaProperty> &schema) { return ret; } +ShardMap ShardMap::Parse(std::istream &input_stream) { + ShardMap shard_map; + const auto read_size = [&input_stream] { + size_t size{0}; + input_stream >> size; + return size; + }; + + // Reads a string until the next whitespace + const auto read_word = [&input_stream] { + std::string word; + input_stream >> word; + return word; + }; + + const auto read_names = [&read_size, &read_word] { + const auto number_of_names = read_size(); + spdlog::trace("Reading {} names", number_of_names); + std::vector<std::string> names; + names.reserve(number_of_names); + + for (auto name_index = 0; name_index < number_of_names; ++name_index) { + names.push_back(read_word()); + spdlog::trace("Read '{}'", names.back()); + } + return names; + }; + + const auto read_line = [&input_stream] { + std::string line; + std::getline(input_stream, line); + return line; + }; + + const auto parse_type = [](const std::string &type) { + static const auto type_map = std::unordered_map<std::string, common::SchemaType>{ + {"string", common::SchemaType::STRING}, {"int", common::SchemaType::INT}, {"bool", common::SchemaType::BOOL}}; + const auto lower_case_type = utils::ToLowerCase(type); + auto it = type_map.find(lower_case_type); + MG_ASSERT(it != type_map.end(), "Invalid type in split files: {}", type); + return it->second; + }; + + const auto parse_property_value = [](std::string text, const common::SchemaType type) { + if (type == common::SchemaType::STRING) { + return storage::v3::PropertyValue{std::move(text)}; + } + if (type == common::SchemaType::INT) { + size_t processed{0}; + int64_t value = std::stoll(text, &processed); + MG_ASSERT(processed == text.size() || text[processed] == ' ', "Invalid integer format: '{}'", text); + return storage::v3::PropertyValue{value}; + } + LOG_FATAL("Not supported type: {}", utils::UnderlyingCast(type)); + }; + + spdlog::debug("Reading properties"); + const auto properties = read_names(); + MG_ASSERT(shard_map.AllocatePropertyIds(properties).size() == properties.size(), + "Unexpected number of properties created!"); + + spdlog::debug("Reading edge types"); + const auto edge_types = read_names(); + MG_ASSERT(shard_map.AllocateEdgeTypeIds(edge_types).size() == edge_types.size(), + "Unexpected number of properties created!"); + + spdlog::debug("Reading primary labels"); + const auto number_of_primary_labels = read_size(); + spdlog::debug("Reading {} primary labels", number_of_primary_labels); + + for (auto label_index = 0; label_index < number_of_primary_labels; ++label_index) { + const auto primary_label = read_word(); + spdlog::debug("Reading primary label named '{}'", primary_label); + const auto number_of_primary_properties = read_size(); + spdlog::debug("Reading {} primary properties", number_of_primary_properties); + std::vector<std::string> pp_names; + std::vector<common::SchemaType> pp_types; + pp_names.reserve(number_of_primary_properties); + pp_types.reserve(number_of_primary_properties); + for (auto property_index = 0; property_index < number_of_primary_properties; ++property_index) { + pp_names.push_back(read_word()); + spdlog::debug("Reading primary property named '{}'", pp_names.back()); + pp_types.push_back(parse_type(read_word())); + } + auto pp_mapping = shard_map.AllocatePropertyIds(pp_names); + std::vector<SchemaProperty> schema; + schema.reserve(number_of_primary_properties); + + for (auto property_index = 0; property_index < number_of_primary_properties; ++property_index) { + schema.push_back(storage::v3::SchemaProperty{pp_mapping.at(pp_names[property_index]), pp_types[property_index]}); + } + const auto hlc = shard_map.GetHlc(); + MG_ASSERT(shard_map.InitializeNewLabel(primary_label, schema, 1, hlc).has_value(), + "Cannot initialize new label: {}", primary_label); + + const auto number_of_split_points = read_size(); + spdlog::debug("Reading {} split points", number_of_split_points); + + [[maybe_unused]] const auto remainder_from_last_line = read_line(); + for (auto split_point_index = 0; split_point_index < number_of_split_points; ++split_point_index) { + const auto line = read_line(); + spdlog::debug("Read split point '{}'", line); + MG_ASSERT(line.front() == '[', "Invalid split file format!"); + MG_ASSERT(line.back() == ']', "Invalid split file format!"); + std::string_view line_view{line}; + line_view.remove_prefix(1); + line_view.remove_suffix(1); + static constexpr std::string_view kDelimiter{","}; + auto pk_values_as_text = utils::Split(line_view, kDelimiter); + std::vector<PropertyValue> pk; + pk.reserve(number_of_primary_properties); + MG_ASSERT(pk_values_as_text.size() == number_of_primary_properties, + "Split point contains invalid number of values '{}'", line); + + for (auto property_index = 0; property_index < number_of_primary_properties; ++property_index) { + pk.push_back(parse_property_value(std::move(pk_values_as_text[property_index]), schema[property_index].type)); + } + shard_map.SplitShard(shard_map.GetHlc(), shard_map.labels.at(primary_label), pk); + } + } + + return shard_map; +} + +std::ostream &operator<<(std::ostream &in, const ShardMap &shard_map) { + using utils::print_helpers::operator<<; + + in << "ShardMap { shard_map_version: " << shard_map.shard_map_version; + in << ", max_property_id: " << shard_map.max_property_id; + in << ", max_edge_type_id: " << shard_map.max_edge_type_id; + in << ", properties: " << shard_map.properties; + in << ", edge_types: " << shard_map.edge_types; + in << ", max_label_id: " << shard_map.max_label_id; + in << ", labels: " << shard_map.labels; + in << ", label_spaces: " << shard_map.label_spaces; + in << ", schemas: " << shard_map.schemas; + in << "}"; + return in; +} + +Shards ShardMap::GetShards(const LabelName &label) { + const auto id = labels.at(label); + auto &shards = label_spaces.at(id).shards; + return shards; +} + +// TODO(gabor) later we will want to update the wallclock time with +// the given Io<impl>'s time as well +Hlc ShardMap::IncrementShardMapVersion() noexcept { + ++shard_map_version.logical_id; + return shard_map_version; +} + +Hlc ShardMap::GetHlc() const noexcept { return shard_map_version; } + +std::vector<ShardToInitialize> ShardMap::AssignShards(Address storage_manager, + std::set<boost::uuids::uuid> initialized) { + std::vector<ShardToInitialize> ret{}; + + bool mutated = false; + + for (auto &[label_id, label_space] : label_spaces) { + for (auto it = label_space.shards.begin(); it != label_space.shards.end(); it++) { + auto &[low_key, shard] = *it; + std::optional<PrimaryKey> high_key; + if (const auto next_it = std::next(it); next_it != label_space.shards.end()) { + high_key = next_it->first; + } + // TODO(tyler) avoid these triple-nested loops by having the heartbeat include better info + bool machine_contains_shard = false; + + for (auto &aas : shard) { + if (initialized.contains(aas.address.unique_id)) { + machine_contains_shard = true; + if (aas.status != Status::CONSENSUS_PARTICIPANT) { + spdlog::info("marking shard as full consensus participant: {}", aas.address.unique_id); + aas.status = Status::CONSENSUS_PARTICIPANT; + } + } else { + const bool same_machine = aas.address.last_known_ip == storage_manager.last_known_ip && + aas.address.last_known_port == storage_manager.last_known_port; + if (same_machine) { + machine_contains_shard = true; + spdlog::info("reminding shard manager that they should begin participating in shard"); + ret.push_back(ShardToInitialize{ + .uuid = aas.address.unique_id, + .label_id = label_id, + .min_key = low_key, + .max_key = high_key, + .schema = schemas[label_id], + .config = Config{}, + }); + } + } + } + + if (!machine_contains_shard && shard.size() < label_space.replication_factor) { + Address address = storage_manager; + + // TODO(tyler) use deterministic UUID so that coordinators don't diverge here + address.unique_id = boost::uuids::uuid{boost::uuids::random_generator()()}, + + spdlog::info("assigning shard manager to shard"); + + ret.push_back(ShardToInitialize{ + .uuid = address.unique_id, + .label_id = label_id, + .min_key = low_key, + .max_key = high_key, + .schema = schemas[label_id], + .config = Config{}, + }); + + AddressAndStatus aas = { + .address = address, + .status = Status::INITIALIZING, + }; + + shard.emplace_back(aas); + } + } + } + + if (mutated) { + IncrementShardMapVersion(); + } + return ret; +} + +bool ShardMap::SplitShard(Hlc previous_shard_map_version, LabelId label_id, const PrimaryKey &key) { + if (previous_shard_map_version != shard_map_version) { + return false; + } + + auto &label_space = label_spaces.at(label_id); + auto &shards_in_map = label_space.shards; + + MG_ASSERT(!shards_in_map.empty()); + MG_ASSERT(!shards_in_map.contains(key)); + MG_ASSERT(label_spaces.contains(label_id)); + + // Finding the Shard that the new PrimaryKey should map to. + auto prev = std::prev(shards_in_map.upper_bound(key)); + Shard duplicated_shard = prev->second; + + // Apply the split + shards_in_map[key] = duplicated_shard; + + IncrementShardMapVersion(); + + return true; +} + std::optional<LabelId> ShardMap::InitializeNewLabel(std::string label_name, std::vector<SchemaProperty> schema, size_t replication_factor, Hlc last_shard_map_version) { if (shard_map_version != last_shard_map_version || labels.contains(label_name)) { @@ -88,4 +350,175 @@ std::optional<LabelId> ShardMap::InitializeNewLabel(std::string label_name, std: return label_id; } +void ShardMap::AddServer(Address server_address) { + // Find a random place for the server to plug in +} +std::optional<LabelId> ShardMap::GetLabelId(const std::string &label) const { + if (const auto it = labels.find(label); it != labels.end()) { + return it->second; + } + + return std::nullopt; +} + +std::string ShardMap::GetLabelName(const LabelId label) const { + if (const auto it = + std::ranges::find_if(labels, [label](const auto &name_id_pair) { return name_id_pair.second == label; }); + it != labels.end()) { + return it->first; + } + throw utils::BasicException("GetLabelName fails on the given label id!"); +} + +std::optional<PropertyId> ShardMap::GetPropertyId(const std::string &property_name) const { + if (const auto it = properties.find(property_name); it != properties.end()) { + return it->second; + } + + return std::nullopt; +} + +std::string ShardMap::GetPropertyName(const PropertyId property) const { + if (const auto it = std::ranges::find_if( + properties, [property](const auto &name_id_pair) { return name_id_pair.second == property; }); + it != properties.end()) { + return it->first; + } + throw utils::BasicException("PropertyId not found!"); +} + +std::optional<EdgeTypeId> ShardMap::GetEdgeTypeId(const std::string &edge_type) const { + if (const auto it = edge_types.find(edge_type); it != edge_types.end()) { + return it->second; + } + + return std::nullopt; +} + +std::string ShardMap::GetEdgeTypeName(const EdgeTypeId property) const { + if (const auto it = std::ranges::find_if( + edge_types, [property](const auto &name_id_pair) { return name_id_pair.second == property; }); + it != edge_types.end()) { + return it->first; + } + throw utils::BasicException("EdgeTypeId not found!"); +} +Shards ShardMap::GetShardsForRange(const LabelName &label_name, const PrimaryKey &start_key, + const PrimaryKey &end_key) const { + MG_ASSERT(start_key <= end_key); + MG_ASSERT(labels.contains(label_name)); + + LabelId label_id = labels.at(label_name); + + const auto &label_space = label_spaces.at(label_id); + + const auto &shards_for_label = label_space.shards; + + MG_ASSERT(shards_for_label.begin()->first <= start_key, + "the ShardMap must always contain a minimal key that is less than or equal to any requested key"); + + auto it = std::prev(shards_for_label.upper_bound(start_key)); + const auto end_it = shards_for_label.upper_bound(end_key); + + Shards shards{}; + + std::copy(it, end_it, std::inserter(shards, shards.end())); + + return shards; +} + +Shard ShardMap::GetShardForKey(const LabelName &label_name, const PrimaryKey &key) const { + MG_ASSERT(labels.contains(label_name)); + + LabelId label_id = labels.at(label_name); + + const auto &label_space = label_spaces.at(label_id); + + MG_ASSERT(label_space.shards.begin()->first <= key, + "the ShardMap must always contain a minimal key that is less than or equal to any requested key"); + + return std::prev(label_space.shards.upper_bound(key))->second; +} + +Shard ShardMap::GetShardForKey(const LabelId &label_id, const PrimaryKey &key) const { + MG_ASSERT(label_spaces.contains(label_id)); + + const auto &label_space = label_spaces.at(label_id); + + MG_ASSERT(label_space.shards.begin()->first <= key, + "the ShardMap must always contain a minimal key that is less than or equal to any requested key"); + + return std::prev(label_space.shards.upper_bound(key))->second; +} + +PropertyMap ShardMap::AllocatePropertyIds(const std::vector<PropertyName> &new_properties) { + PropertyMap ret{}; + + bool mutated = false; + + for (const auto &property_name : new_properties) { + if (properties.contains(property_name)) { + auto property_id = properties.at(property_name); + ret.emplace(property_name, property_id); + } else { + mutated = true; + + const PropertyId property_id = PropertyId::FromUint(++max_property_id); + ret.emplace(property_name, property_id); + properties.emplace(property_name, property_id); + } + } + + if (mutated) { + IncrementShardMapVersion(); + } + + return ret; +} + +EdgeTypeIdMap ShardMap::AllocateEdgeTypeIds(const std::vector<EdgeTypeName> &new_edge_types) { + EdgeTypeIdMap ret; + + bool mutated = false; + + for (const auto &edge_type_name : new_edge_types) { + if (edge_types.contains(edge_type_name)) { + auto edge_type_id = edge_types.at(edge_type_name); + ret.emplace(edge_type_name, edge_type_id); + } else { + mutated = true; + + const EdgeTypeId edge_type_id = EdgeTypeId::FromUint(++max_edge_type_id); + ret.emplace(edge_type_name, edge_type_id); + edge_types.emplace(edge_type_name, edge_type_id); + } + } + + if (mutated) { + IncrementShardMapVersion(); + } + + return ret; +} + +bool ShardMap::ClusterInitialized() const { + for (const auto &[label_id, label_space] : label_spaces) { + for (const auto &[low_key, shard] : label_space.shards) { + if (shard.size() < label_space.replication_factor) { + spdlog::info("label_space below desired replication factor"); + return false; + } + + for (const auto &aas : shard) { + if (aas.status != Status::CONSENSUS_PARTICIPANT) { + spdlog::info("shard member not yet a CONSENSUS_PARTICIPANT"); + return false; + } + } + } + } + + return true; +} + } // namespace memgraph::coordinator diff --git a/src/coordinator/shard_map.hpp b/src/coordinator/shard_map.hpp index aa0f13d09..b637e2300 100644 --- a/src/coordinator/shard_map.hpp +++ b/src/coordinator/shard_map.hpp @@ -96,6 +96,7 @@ PrimaryKey SchemaToMinKey(const std::vector<SchemaProperty> &schema); struct LabelSpace { std::vector<SchemaProperty> schema; + // Maps between the smallest primary key stored in the shard and the shard std::map<PrimaryKey, Shard> shards; size_t replication_factor; @@ -123,311 +124,48 @@ struct ShardMap { std::map<LabelId, LabelSpace> label_spaces; std::map<LabelId, std::vector<SchemaProperty>> schemas; - friend std::ostream &operator<<(std::ostream &in, const ShardMap &shard_map) { - using utils::print_helpers::operator<<; + [[nodiscard]] static ShardMap Parse(std::istream &input_stream); + friend std::ostream &operator<<(std::ostream &in, const ShardMap &shard_map); - in << "ShardMap { shard_map_version: " << shard_map.shard_map_version; - in << ", max_property_id: " << shard_map.max_property_id; - in << ", max_edge_type_id: " << shard_map.max_edge_type_id; - in << ", properties: " << shard_map.properties; - in << ", edge_types: " << shard_map.edge_types; - in << ", max_label_id: " << shard_map.max_label_id; - in << ", labels: " << shard_map.labels; - in << ", label_spaces: " << shard_map.label_spaces; - in << ", schemas: " << shard_map.schemas; - in << "}"; - return in; - } - - Shards GetShards(const LabelName &label) { - const auto id = labels.at(label); - auto &shards = label_spaces.at(id).shards; - return shards; - } + Shards GetShards(const LabelName &label); // TODO(gabor) later we will want to update the wallclock time with // the given Io<impl>'s time as well - Hlc IncrementShardMapVersion() noexcept { - ++shard_map_version.logical_id; - return shard_map_version; - } - - Hlc GetHlc() const noexcept { return shard_map_version; } + Hlc IncrementShardMapVersion() noexcept; + Hlc GetHlc() const noexcept; // Returns the shard UUIDs that have been assigned but not yet acknowledged for this storage manager - std::vector<ShardToInitialize> AssignShards(Address storage_manager, std::set<boost::uuids::uuid> initialized) { - std::vector<ShardToInitialize> ret{}; + std::vector<ShardToInitialize> AssignShards(Address storage_manager, std::set<boost::uuids::uuid> initialized); - bool mutated = false; - - for (auto &[label_id, label_space] : label_spaces) { - for (auto it = label_space.shards.begin(); it != label_space.shards.end(); it++) { - auto &[low_key, shard] = *it; - std::optional<PrimaryKey> high_key; - if (const auto next_it = std::next(it); next_it != label_space.shards.end()) { - high_key = next_it->first; - } - // TODO(tyler) avoid these triple-nested loops by having the heartbeat include better info - bool machine_contains_shard = false; - - for (auto &aas : shard) { - if (initialized.contains(aas.address.unique_id)) { - machine_contains_shard = true; - if (aas.status != Status::CONSENSUS_PARTICIPANT) { - spdlog::info("marking shard as full consensus participant: {}", aas.address.unique_id); - aas.status = Status::CONSENSUS_PARTICIPANT; - } - } else { - const bool same_machine = aas.address.last_known_ip == storage_manager.last_known_ip && - aas.address.last_known_port == storage_manager.last_known_port; - if (same_machine) { - machine_contains_shard = true; - spdlog::info("reminding shard manager that they should begin participating in shard"); - ret.push_back(ShardToInitialize{ - .uuid = aas.address.unique_id, - .label_id = label_id, - .min_key = low_key, - .max_key = high_key, - .schema = schemas[label_id], - .config = Config{}, - }); - } - } - } - - if (!machine_contains_shard && shard.size() < label_space.replication_factor) { - Address address = storage_manager; - - // TODO(tyler) use deterministic UUID so that coordinators don't diverge here - address.unique_id = boost::uuids::uuid{boost::uuids::random_generator()()}, - - spdlog::info("assigning shard manager to shard"); - - ret.push_back(ShardToInitialize{ - .uuid = address.unique_id, - .label_id = label_id, - .min_key = low_key, - .max_key = high_key, - .schema = schemas[label_id], - .config = Config{}, - }); - - AddressAndStatus aas = { - .address = address, - .status = Status::INITIALIZING, - }; - - shard.emplace_back(aas); - } - } - } - - if (mutated) { - IncrementShardMapVersion(); - } - - return ret; - } - - bool SplitShard(Hlc previous_shard_map_version, LabelId label_id, const PrimaryKey &key) { - if (previous_shard_map_version != shard_map_version) { - return false; - } - - auto &label_space = label_spaces.at(label_id); - auto &shards_in_map = label_space.shards; - - MG_ASSERT(!shards_in_map.empty()); - MG_ASSERT(!shards_in_map.contains(key)); - MG_ASSERT(label_spaces.contains(label_id)); - - // Finding the Shard that the new PrimaryKey should map to. - auto prev = std::prev(shards_in_map.upper_bound(key)); - Shard duplicated_shard = prev->second; - - // Apply the split - shards_in_map[key] = duplicated_shard; - - return true; - } + bool SplitShard(Hlc previous_shard_map_version, LabelId label_id, const PrimaryKey &key); std::optional<LabelId> InitializeNewLabel(std::string label_name, std::vector<SchemaProperty> schema, size_t replication_factor, Hlc last_shard_map_version); - void AddServer(Address server_address) { - // Find a random place for the server to plug in - } + void AddServer(Address server_address); - std::optional<LabelId> GetLabelId(const std::string &label) const { - if (const auto it = labels.find(label); it != labels.end()) { - return it->second; - } + std::optional<LabelId> GetLabelId(const std::string &label) const; + // TODO(antaljanosbenjamin): Remove this and instead use NameIdMapper + std::string GetLabelName(LabelId label) const; + std::optional<PropertyId> GetPropertyId(const std::string &property_name) const; + std::string GetPropertyName(PropertyId property) const; + std::optional<EdgeTypeId> GetEdgeTypeId(const std::string &edge_type) const; + std::string GetEdgeTypeName(EdgeTypeId property) const; - return std::nullopt; - } + Shards GetShardsForRange(const LabelName &label_name, const PrimaryKey &start_key, const PrimaryKey &end_key) const; - std::string GetLabelName(const LabelId label) const { - if (const auto it = - std::ranges::find_if(labels, [label](const auto &name_id_pair) { return name_id_pair.second == label; }); - it != labels.end()) { - return it->first; - } - throw utils::BasicException("GetLabelName fails on the given label id!"); - } + Shard GetShardForKey(const LabelName &label_name, const PrimaryKey &key) const; - std::optional<PropertyId> GetPropertyId(const std::string &property_name) const { - if (const auto it = properties.find(property_name); it != properties.end()) { - return it->second; - } + Shard GetShardForKey(const LabelId &label_id, const PrimaryKey &key) const; - return std::nullopt; - } + PropertyMap AllocatePropertyIds(const std::vector<PropertyName> &new_properties); - std::string GetPropertyName(const PropertyId property) const { - if (const auto it = std::ranges::find_if( - properties, [property](const auto &name_id_pair) { return name_id_pair.second == property; }); - it != properties.end()) { - return it->first; - } - throw utils::BasicException("PropertyId not found!"); - } - - std::optional<EdgeTypeId> GetEdgeTypeId(const std::string &edge_type) const { - if (const auto it = edge_types.find(edge_type); it != edge_types.end()) { - return it->second; - } - - return std::nullopt; - } - - std::string GetEdgeTypeName(const EdgeTypeId property) const { - if (const auto it = std::ranges::find_if( - edge_types, [property](const auto &name_id_pair) { return name_id_pair.second == property; }); - it != edge_types.end()) { - return it->first; - } - throw utils::BasicException("EdgeTypeId not found!"); - } - - Shards GetShardsForRange(const LabelName &label_name, const PrimaryKey &start_key, const PrimaryKey &end_key) const { - MG_ASSERT(start_key <= end_key); - MG_ASSERT(labels.contains(label_name)); - - LabelId label_id = labels.at(label_name); - - const auto &label_space = label_spaces.at(label_id); - - const auto &shards_for_label = label_space.shards; - - MG_ASSERT(shards_for_label.begin()->first <= start_key, - "the ShardMap must always contain a minimal key that is less than or equal to any requested key"); - - auto it = std::prev(shards_for_label.upper_bound(start_key)); - const auto end_it = shards_for_label.upper_bound(end_key); - - Shards shards{}; - - std::copy(it, end_it, std::inserter(shards, shards.end())); - - return shards; - } - - Shard GetShardForKey(const LabelName &label_name, const PrimaryKey &key) const { - MG_ASSERT(labels.contains(label_name)); - - LabelId label_id = labels.at(label_name); - - const auto &label_space = label_spaces.at(label_id); - - MG_ASSERT(label_space.shards.begin()->first <= key, - "the ShardMap must always contain a minimal key that is less than or equal to any requested key"); - - return std::prev(label_space.shards.upper_bound(key))->second; - } - - Shard GetShardForKey(const LabelId &label_id, const PrimaryKey &key) const { - MG_ASSERT(label_spaces.contains(label_id)); - - const auto &label_space = label_spaces.at(label_id); - - MG_ASSERT(label_space.shards.begin()->first <= key, - "the ShardMap must always contain a minimal key that is less than or equal to any requested key"); - - return std::prev(label_space.shards.upper_bound(key))->second; - } - - PropertyMap AllocatePropertyIds(const std::vector<PropertyName> &new_properties) { - PropertyMap ret{}; - - bool mutated = false; - - for (const auto &property_name : new_properties) { - if (properties.contains(property_name)) { - auto property_id = properties.at(property_name); - ret.emplace(property_name, property_id); - } else { - mutated = true; - - const PropertyId property_id = PropertyId::FromUint(++max_property_id); - ret.emplace(property_name, property_id); - properties.emplace(property_name, property_id); - } - } - - if (mutated) { - IncrementShardMapVersion(); - } - - return ret; - } - - EdgeTypeIdMap AllocateEdgeTypeIds(const std::vector<EdgeTypeName> &new_edge_types) { - EdgeTypeIdMap ret; - - bool mutated = false; - - for (const auto &edge_type_name : new_edge_types) { - if (edge_types.contains(edge_type_name)) { - auto edge_type_id = edge_types.at(edge_type_name); - ret.emplace(edge_type_name, edge_type_id); - } else { - mutated = true; - - const EdgeTypeId edge_type_id = EdgeTypeId::FromUint(++max_edge_type_id); - ret.emplace(edge_type_name, edge_type_id); - edge_types.emplace(edge_type_name, edge_type_id); - } - } - - if (mutated) { - IncrementShardMapVersion(); - } - - return ret; - } + EdgeTypeIdMap AllocateEdgeTypeIds(const std::vector<EdgeTypeName> &new_edge_types); /// Returns true if all shards have the desired number of replicas and they are in /// the CONSENSUS_PARTICIPANT state. Note that this does not necessarily mean that /// there is also an active leader for each shard. - bool ClusterInitialized() const { - for (const auto &[label_id, label_space] : label_spaces) { - for (const auto &[low_key, shard] : label_space.shards) { - if (shard.size() < label_space.replication_factor) { - spdlog::info("label_space below desired replication factor"); - return false; - } - - for (const auto &aas : shard) { - if (aas.status != Status::CONSENSUS_PARTICIPANT) { - spdlog::info("shard member not yet a CONSENSUS_PARTICIPANT"); - return false; - } - } - } - } - - return true; - } + bool ClusterInitialized() const; }; } // namespace memgraph::coordinator diff --git a/src/memgraph.cpp b/src/memgraph.cpp index 9c4d71e94..c696615b5 100644 --- a/src/memgraph.cpp +++ b/src/memgraph.cpp @@ -16,6 +16,7 @@ #include <cstdint> #include <exception> #include <filesystem> +#include <fstream> #include <functional> #include <limits> #include <map> @@ -265,6 +266,10 @@ DEFINE_uint64( "Total memory limit in MiB. Set to 0 to use the default values which are 100\% of the phyisical memory if the swap " "is enabled and 90\% of the physical memory otherwise."); +// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables) +DEFINE_string(split_file, "", + "Path to the split file which contains the predefined labels, properties, edge types and shard-ranges."); + namespace { using namespace std::literals; inline constexpr std::array isolation_level_mappings{ @@ -639,15 +644,22 @@ int main(int argc, char **argv) { .listen_port = unique_local_addr_query.last_known_port, }; - const std::string property{"property"}; - const std::string label{"label"}; memgraph::coordinator::ShardMap sm; - auto prop_map = sm.AllocatePropertyIds(std::vector<std::string>{property}); - auto edge_type_map = sm.AllocateEdgeTypeIds(std::vector<std::string>{"TO"}); - std::vector<memgraph::storage::v3::SchemaProperty> schema{{prop_map.at(property), memgraph::common::SchemaType::INT}}; - sm.InitializeNewLabel(label, schema, 1, sm.shard_map_version); - sm.SplitShard(sm.GetHlc(), *sm.GetLabelId(label), - std::vector<memgraph::storage::v3::PropertyValue>{memgraph::storage::v3::PropertyValue{2}}); + if (FLAGS_split_file.empty()) { + const std::string property{"property"}; + const std::string label{"label"}; + auto prop_map = sm.AllocatePropertyIds(std::vector<std::string>{property}); + auto edge_type_map = sm.AllocateEdgeTypeIds(std::vector<std::string>{"TO"}); + std::vector<memgraph::storage::v3::SchemaProperty> schema{ + {prop_map.at(property), memgraph::common::SchemaType::INT}}; + sm.InitializeNewLabel(label, schema, 1, sm.shard_map_version); + sm.SplitShard(sm.GetHlc(), *sm.GetLabelId(label), + std::vector<memgraph::storage::v3::PropertyValue>{memgraph::storage::v3::PropertyValue{2}}); + } else { + std::ifstream input{FLAGS_split_file, std::ios::in}; + MG_ASSERT(input.is_open(), "Cannot open split file to read: {}", FLAGS_split_file); + sm = memgraph::coordinator::ShardMap::Parse(input); + } memgraph::coordinator::Coordinator coordinator{sm}; diff --git a/src/query/v2/CMakeLists.txt b/src/query/v2/CMakeLists.txt index 03167341a..3c3f780c8 100644 --- a/src/query/v2/CMakeLists.txt +++ b/src/query/v2/CMakeLists.txt @@ -33,7 +33,7 @@ add_dependencies(mg-query-v2 generate_lcp_query_v2) target_include_directories(mg-query-v2 PUBLIC ${CMAKE_SOURCE_DIR}/include) target_include_directories(mg-query-v2 PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/bindings) target_link_libraries(mg-query-v2 dl cppitertools Boost::headers) -target_link_libraries(mg-query-v2 mg-integrations-pulsar mg-integrations-kafka mg-storage-v3 mg-license mg-utils mg-kvstore mg-memory) +target_link_libraries(mg-query-v2 mg-integrations-pulsar mg-integrations-kafka mg-storage-v3 mg-license mg-utils mg-kvstore mg-memory mg-coordinator) target_link_libraries(mg-query-v2 mg-expr) if(NOT "${MG_PYTHON_PATH}" STREQUAL "") diff --git a/tests/simulation/shard_rsm.cpp b/tests/simulation/shard_rsm.cpp index cadc66702..64d0a0861 100644 --- a/tests/simulation/shard_rsm.cpp +++ b/tests/simulation/shard_rsm.cpp @@ -1046,7 +1046,6 @@ void TestExpandOneGraphTwo(ShardClient &client) { MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val_2)); auto edge_type_id = EdgeTypeId::FromUint(GetUniqueInteger()); - auto wrong_edge_type_id = EdgeTypeId::FromUint(GetUniqueInteger()); auto edge_gid_1 = GetUniqueInteger(); auto edge_gid_2 = GetUniqueInteger(); diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index d70c4d867..f3a95c1d3 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -333,36 +333,35 @@ target_link_libraries(${test_prefix}storage_v3_schema mg-storage-v3) # Test mg-query-v2 # These are commented out because of the new TypedValue in the query engine -#add_unit_test(query_v2_interpreter.cpp ${CMAKE_SOURCE_DIR}/src/glue/v2/communication.cpp) -#target_link_libraries(${test_prefix}query_v2_interpreter mg-storage-v3 mg-query-v2 mg-communication) +# add_unit_test(query_v2_interpreter.cpp ${CMAKE_SOURCE_DIR}/src/glue/v2/communication.cpp) +# target_link_libraries(${test_prefix}query_v2_interpreter mg-storage-v3 mg-query-v2 mg-communication) # -#add_unit_test(query_v2_query_plan_accumulate_aggregate.cpp) -#target_link_libraries(${test_prefix}query_v2_query_plan_accumulate_aggregate mg-query-v2) +# add_unit_test(query_v2_query_plan_accumulate_aggregate.cpp) +# target_link_libraries(${test_prefix}query_v2_query_plan_accumulate_aggregate mg-query-v2) # -#add_unit_test(query_v2_query_plan_create_set_remove_delete.cpp) -#target_link_libraries(${test_prefix}query_v2_query_plan_create_set_remove_delete mg-query-v2 mg-expr) +# add_unit_test(query_v2_query_plan_create_set_remove_delete.cpp) +# target_link_libraries(${test_prefix}query_v2_query_plan_create_set_remove_delete mg-query-v2 mg-expr) # -#add_unit_test(query_v2_query_plan_bag_semantics.cpp) -#target_link_libraries(${test_prefix}query_v2_query_plan_bag_semantics mg-query-v2) +# add_unit_test(query_v2_query_plan_bag_semantics.cpp) +# target_link_libraries(${test_prefix}query_v2_query_plan_bag_semantics mg-query-v2) # -#add_unit_test(query_v2_query_plan_edge_cases.cpp ${CMAKE_SOURCE_DIR}/src/glue/v2/communication.cpp) -#target_link_libraries(${test_prefix}query_v2_query_plan_edge_cases mg-communication mg-query-v2) +# add_unit_test(query_v2_query_plan_edge_cases.cpp ${CMAKE_SOURCE_DIR}/src/glue/v2/communication.cpp) +# target_link_libraries(${test_prefix}query_v2_query_plan_edge_cases mg-communication mg-query-v2) # -#add_unit_test(query_v2_query_plan_v2_create_set_remove_delete.cpp) -#target_link_libraries(${test_prefix}query_v2_query_plan_v2_create_set_remove_delete mg-query-v2) +# add_unit_test(query_v2_query_plan_v2_create_set_remove_delete.cpp) +# target_link_libraries(${test_prefix}query_v2_query_plan_v2_create_set_remove_delete mg-query-v2) # -#add_unit_test(query_v2_query_plan_match_filter_return.cpp) -#target_link_libraries(${test_prefix}query_v2_query_plan_match_filter_return mg-query-v2) +# add_unit_test(query_v2_query_plan_match_filter_return.cpp) +# target_link_libraries(${test_prefix}query_v2_query_plan_match_filter_return mg-query-v2) # -#add_unit_test(query_v2_cypher_main_visitor.cpp) -#target_link_libraries(${test_prefix}query_v2_cypher_main_visitor mg-query-v2) +# add_unit_test(query_v2_cypher_main_visitor.cpp) +# target_link_libraries(${test_prefix}query_v2_cypher_main_visitor mg-query-v2) # -#add_unit_test(query_v2_query_required_privileges.cpp) -#target_link_libraries(${test_prefix}query_v2_query_required_privileges mg-query-v2) +# add_unit_test(query_v2_query_required_privileges.cpp) +# target_link_libraries(${test_prefix}query_v2_query_required_privileges mg-query-v2) # -#add_unit_test(replication_persistence_helper.cpp) -#target_link_libraries(${test_prefix}replication_persistence_helper mg-storage-v2) - +# add_unit_test(replication_persistence_helper.cpp) +# target_link_libraries(${test_prefix}replication_persistence_helper mg-storage-v2) add_unit_test(query_v2_dummy_test.cpp) target_link_libraries(${test_prefix}query_v2_dummy_test mg-query-v2) @@ -436,3 +435,7 @@ target_link_libraries(${test_prefix}machine_manager mg-io mg-coordinator mg-stor add_unit_test(pretty_print_ast_to_original_expression_test.cpp) target_link_libraries(${test_prefix}pretty_print_ast_to_original_expression_test mg-io mg-expr mg-query-v2) + +# Tests for mg-coordinator +add_unit_test(coordinator_shard_map.cpp) +target_link_libraries(${test_prefix}coordinator_shard_map mg-coordinator) diff --git a/tests/unit/coordinator_shard_map.cpp b/tests/unit/coordinator_shard_map.cpp new file mode 100644 index 000000000..4bfb43a24 --- /dev/null +++ b/tests/unit/coordinator_shard_map.cpp @@ -0,0 +1,104 @@ +// Copyright 2022 Memgraph Ltd. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +// License, and you may not use this file except in compliance with the Business Source License. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +#include <sstream> +#include <string> + +#include "common/types.hpp" +#include "coordinator/shard_map.hpp" +#include "gtest/gtest.h" +#include "storage/v3/id_types.hpp" +#include "storage/v3/property_value.hpp" +#include "storage/v3/schemas.hpp" + +namespace memgraph::coordinator::tests { +TEST(ShardMap, Parse) { + std::string input = R"(4 +property_1 +property_2 +property_3 +property_4 +3 +edge_type_1 +edge_type_2 +edge_type_3 +2 +label_1 +1 +primary_property_name_1 +string +4 +[asdasd] +[qweqwe] +[bnm] +[tryuryturtyur] +label_2 +3 +property_1 +string +property_2 +int +primary_property_name_2 +InT +2 +[first,1 ,2] +[ second ,-1, -9223372036854775808] +)"; + + std::stringstream stream(input); + auto shard_map = ShardMap::Parse(stream); + EXPECT_EQ(shard_map.properties.size(), 6); + EXPECT_EQ(shard_map.edge_types.size(), 3); + EXPECT_EQ(shard_map.label_spaces.size(), 2); + EXPECT_EQ(shard_map.schemas.size(), 2); + + auto check_label = [&shard_map](const std::string &label_name, const std::vector<SchemaProperty> &expected_schema, + const std::vector<PrimaryKey> &expected_split_points) { + ASSERT_TRUE(shard_map.labels.contains(label_name)); + const auto label_id = shard_map.labels.at(label_name); + const auto &schema = shard_map.schemas.at(label_id); + ASSERT_EQ(schema.size(), expected_schema.size()); + for (auto pp_index = 0; pp_index < schema.size(); ++pp_index) { + EXPECT_EQ(schema[pp_index].property_id, expected_schema[pp_index].property_id); + EXPECT_EQ(schema[pp_index].type, expected_schema[pp_index].type); + } + + const auto &label_space = shard_map.label_spaces.at(label_id); + + ASSERT_EQ(label_space.shards.size(), expected_split_points.size()); + for (const auto &split_point : expected_split_points) { + EXPECT_TRUE(label_space.shards.contains(split_point)) << split_point[0]; + } + }; + + check_label("label_1", + {SchemaProperty{shard_map.properties.at("primary_property_name_1"), common::SchemaType::STRING}}, + std::vector<PrimaryKey>{ + PrimaryKey{PropertyValue{""}}, + PrimaryKey{PropertyValue{"asdasd"}}, + PrimaryKey{PropertyValue{"qweqwe"}}, + PrimaryKey{PropertyValue{"bnm"}}, + PrimaryKey{PropertyValue{"tryuryturtyur"}}, + }); + + static constexpr int64_t kMinInt = std::numeric_limits<int64_t>::min(); + check_label("label_2", + {SchemaProperty{shard_map.properties.at("property_1"), common::SchemaType::STRING}, + SchemaProperty{shard_map.properties.at("property_2"), common::SchemaType::INT}, + SchemaProperty{shard_map.properties.at("primary_property_name_2"), common::SchemaType::INT}}, + std::vector<PrimaryKey>{ + PrimaryKey{PropertyValue{""}, PropertyValue{kMinInt}, PropertyValue{kMinInt}}, + PrimaryKey{PropertyValue{"first"}, PropertyValue{1}, PropertyValue{2}}, + PrimaryKey{PropertyValue{" second "}, PropertyValue{-1}, + PropertyValue{int64_t{-9223372036854775807LL - 1LL}}}, + }); +} +} // namespace memgraph::coordinator::tests From ca2351124b582866d0e96323d95535473484d5a6 Mon Sep 17 00:00:00 2001 From: gvolfing <107616712+gvolfing@users.noreply.github.com> Date: Tue, 25 Oct 2022 19:48:17 +0200 Subject: [PATCH 2/2] Make primary labels act as label indices (#605) Because of the lexicographical sharding, the primary labels themselves are acting as indexes. If a primary label is specified in a MATCH query we can safely narrow the range of shards we have to scan through based on that label. This PR introduces the necessary changes in order to achieve that. --- src/coordinator/shard_map.cpp | 18 ++++++-- src/coordinator/shard_map.hpp | 10 ++-- src/query/v2/plan/operator.cpp | 3 +- src/query/v2/plan/vertex_count_cache.hpp | 3 +- src/query/v2/shard_request_manager.hpp | 58 +++++++++++++++--------- 5 files changed, 59 insertions(+), 33 deletions(-) diff --git a/src/coordinator/shard_map.cpp b/src/coordinator/shard_map.cpp index 87b449301..bd21dea3e 100644 --- a/src/coordinator/shard_map.cpp +++ b/src/coordinator/shard_map.cpp @@ -206,12 +206,20 @@ std::ostream &operator<<(std::ostream &in, const ShardMap &shard_map) { return in; } -Shards ShardMap::GetShards(const LabelName &label) { +Shards ShardMap::GetShardsForLabel(const LabelName &label) const { const auto id = labels.at(label); - auto &shards = label_spaces.at(id).shards; + const auto &shards = label_spaces.at(id).shards; return shards; } +std::vector<Shards> ShardMap::GetAllShards() const { + std::vector<Shards> all_shards; + all_shards.reserve(label_spaces.size()); + std::transform(label_spaces.begin(), label_spaces.end(), std::back_inserter(all_shards), + [](const auto &label_space) { return label_space.second.shards; }); + return all_shards; +} + // TODO(gabor) later we will want to update the wallclock time with // the given Io<impl>'s time as well Hlc ShardMap::IncrementShardMapVersion() noexcept { @@ -361,7 +369,7 @@ std::optional<LabelId> ShardMap::GetLabelId(const std::string &label) const { return std::nullopt; } -std::string ShardMap::GetLabelName(const LabelId label) const { +const std::string &ShardMap::GetLabelName(const LabelId label) const { if (const auto it = std::ranges::find_if(labels, [label](const auto &name_id_pair) { return name_id_pair.second == label; }); it != labels.end()) { @@ -378,7 +386,7 @@ std::optional<PropertyId> ShardMap::GetPropertyId(const std::string &property_na return std::nullopt; } -std::string ShardMap::GetPropertyName(const PropertyId property) const { +const std::string &ShardMap::GetPropertyName(const PropertyId property) const { if (const auto it = std::ranges::find_if( properties, [property](const auto &name_id_pair) { return name_id_pair.second == property; }); it != properties.end()) { @@ -395,7 +403,7 @@ std::optional<EdgeTypeId> ShardMap::GetEdgeTypeId(const std::string &edge_type) return std::nullopt; } -std::string ShardMap::GetEdgeTypeName(const EdgeTypeId property) const { +const std::string &ShardMap::GetEdgeTypeName(const EdgeTypeId property) const { if (const auto it = std::ranges::find_if( edge_types, [property](const auto &name_id_pair) { return name_id_pair.second == property; }); it != edge_types.end()) { diff --git a/src/coordinator/shard_map.hpp b/src/coordinator/shard_map.hpp index b637e2300..63274aa76 100644 --- a/src/coordinator/shard_map.hpp +++ b/src/coordinator/shard_map.hpp @@ -127,7 +127,9 @@ struct ShardMap { [[nodiscard]] static ShardMap Parse(std::istream &input_stream); friend std::ostream &operator<<(std::ostream &in, const ShardMap &shard_map); - Shards GetShards(const LabelName &label); + Shards GetShardsForLabel(const LabelName &label) const; + + std::vector<Shards> GetAllShards() const; // TODO(gabor) later we will want to update the wallclock time with // the given Io<impl>'s time as well @@ -146,11 +148,11 @@ struct ShardMap { std::optional<LabelId> GetLabelId(const std::string &label) const; // TODO(antaljanosbenjamin): Remove this and instead use NameIdMapper - std::string GetLabelName(LabelId label) const; + const std::string &GetLabelName(LabelId label) const; std::optional<PropertyId> GetPropertyId(const std::string &property_name) const; - std::string GetPropertyName(PropertyId property) const; + const std::string &GetPropertyName(PropertyId property) const; std::optional<EdgeTypeId> GetEdgeTypeId(const std::string &edge_type) const; - std::string GetEdgeTypeName(EdgeTypeId property) const; + const std::string &GetEdgeTypeName(EdgeTypeId property) const; Shards GetShardsForRange(const LabelName &label_name, const PrimaryKey &start_key, const PrimaryKey &end_key) const; diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp index 3c36559c9..4243c6bd1 100644 --- a/src/query/v2/plan/operator.cpp +++ b/src/query/v2/plan/operator.cpp @@ -381,6 +381,8 @@ class DistributedScanAllAndFilterCursor : public Cursor { } } + request_state_.label = label_.has_value() ? std::make_optional(shard_manager.LabelToName(*label_)) : std::nullopt; + if (current_vertex_it == current_batch.end()) { if (request_state_.state == State::COMPLETED || !MakeRequest(shard_manager)) { ResetExecutionState(); @@ -399,7 +401,6 @@ class DistributedScanAllAndFilterCursor : public Cursor { current_batch.clear(); current_vertex_it = current_batch.end(); request_state_ = msgs::ExecutionState<msgs::ScanVerticesRequest>{}; - request_state_.label = "label"; } void Reset() override { diff --git a/src/query/v2/plan/vertex_count_cache.hpp b/src/query/v2/plan/vertex_count_cache.hpp index f1be8e1a1..a7bfbdf85 100644 --- a/src/query/v2/plan/vertex_count_cache.hpp +++ b/src/query/v2/plan/vertex_count_cache.hpp @@ -52,7 +52,8 @@ class VertexCountCache { return 1; } - bool LabelIndexExists(storage::v3::LabelId /*label*/) { return false; } + // For now return true if label is primary label + bool LabelIndexExists(storage::v3::LabelId label) { return shard_request_manager_->IsPrimaryLabel(label); } bool LabelPropertyIndexExists(storage::v3::LabelId /*label*/, storage::v3::PropertyId /*property*/) { return false; } diff --git a/src/query/v2/shard_request_manager.hpp b/src/query/v2/shard_request_manager.hpp index 4ee36ec4a..79794aa1a 100644 --- a/src/query/v2/shard_request_manager.hpp +++ b/src/query/v2/shard_request_manager.hpp @@ -129,6 +129,7 @@ class ShardRequestManagerInterface { virtual const std::string &PropertyToName(memgraph::storage::v3::PropertyId prop) const = 0; virtual const std::string &LabelToName(memgraph::storage::v3::LabelId label) const = 0; virtual const std::string &EdgeTypeToName(memgraph::storage::v3::EdgeTypeId type) const = 0; + virtual bool IsPrimaryLabel(LabelId label) const = 0; virtual bool IsPrimaryKey(LabelId primary_label, PropertyId property) const = 0; }; @@ -222,17 +223,14 @@ class ShardRequestManager : public ShardRequestManagerInterface { return shards_map_.GetLabelId(name).value(); } - const std::string &PropertyToName(memgraph::storage::v3::PropertyId /*prop*/) const override { - static std::string str{"dummy__prop"}; - return str; + const std::string &PropertyToName(memgraph::storage::v3::PropertyId prop) const override { + return shards_map_.GetPropertyName(prop); } - const std::string &LabelToName(memgraph::storage::v3::LabelId /*label*/) const override { - static std::string str{"dummy__label"}; - return str; + const std::string &LabelToName(memgraph::storage::v3::LabelId label) const override { + return shards_map_.GetLabelName(label); } - const std::string &EdgeTypeToName(memgraph::storage::v3::EdgeTypeId /*type*/) const override { - static std::string str{"dummy__edgetype"}; - return str; + const std::string &EdgeTypeToName(memgraph::storage::v3::EdgeTypeId type) const override { + return shards_map_.GetEdgeTypeName(type); } bool IsPrimaryKey(LabelId primary_label, PropertyId property) const override { @@ -244,9 +242,10 @@ class ShardRequestManager : public ShardRequestManagerInterface { }) != schema_it->second.end(); } + bool IsPrimaryLabel(LabelId label) const override { return shards_map_.label_spaces.contains(label); } + // TODO(kostasrim) Simplify return result std::vector<VertexAccessor> Request(ExecutionState<ScanVerticesRequest> &state) override { - spdlog::info("shards_map_.size(): {}", shards_map_.GetShards(*state.label).size()); MaybeInitializeExecutionState(state); std::vector<ScanVerticesResponse> responses; @@ -455,15 +454,26 @@ class ShardRequestManager : public ShardRequestManagerInterface { if (ShallNotInitializeState(state)) { return; } + + std::vector<coordinator::Shards> multi_shards; state.transaction_id = transaction_id_; - auto shards = shards_map_.GetShards(*state.label); - for (auto &[key, shard] : shards) { - MG_ASSERT(!shard.empty()); - state.shard_cache.push_back(std::move(shard)); - ScanVerticesRequest rqst; - rqst.transaction_id = transaction_id_; - rqst.start_id.second = storage::conversions::ConvertValueVector(key); - state.requests.push_back(std::move(rqst)); + if (!state.label) { + multi_shards = shards_map_.GetAllShards(); + } else { + const auto label_id = shards_map_.GetLabelId(*state.label); + MG_ASSERT(label_id); + MG_ASSERT(IsPrimaryLabel(*label_id)); + multi_shards = {shards_map_.GetShardsForLabel(*state.label)}; + } + for (auto &shards : multi_shards) { + for (auto &[key, shard] : shards) { + MG_ASSERT(!shard.empty()); + state.shard_cache.push_back(std::move(shard)); + ScanVerticesRequest rqst; + rqst.transaction_id = transaction_id_; + rqst.start_id.second = storage::conversions::ConvertValueVector(key); + state.requests.push_back(std::move(rqst)); + } } state.state = ExecutionState<ScanVerticesRequest>::EXECUTING; } @@ -521,11 +531,15 @@ class ShardRequestManager : public ShardRequestManagerInterface { } void SendAllRequests(ExecutionState<ScanVerticesRequest> &state) { + int64_t shard_idx = 0; for (const auto &request : state.requests) { - auto &storage_client = - GetStorageClientForShard(*state.label, storage::conversions::ConvertPropertyVector(request.start_id.second)); + const auto ¤t_shard = state.shard_cache[shard_idx]; + + auto &storage_client = GetStorageClientForShard(current_shard); ReadRequests req = request; storage_client.SendAsyncReadRequest(request); + + ++shard_idx; } } @@ -647,8 +661,8 @@ class ShardRequestManager : public ShardRequestManagerInterface { continue; } - auto &storage_client = GetStorageClientForShard( - *state.label, storage::conversions::ConvertPropertyVector(state.requests[request_idx].start_id.second)); + auto &storage_client = GetStorageClientForShard(*shard_it); + auto await_result = storage_client.AwaitAsyncReadRequest(); if (!await_result) {