diff --git a/src/coordinator/shard_map.cpp b/src/coordinator/shard_map.cpp index 87b449301..bd21dea3e 100644 --- a/src/coordinator/shard_map.cpp +++ b/src/coordinator/shard_map.cpp @@ -206,12 +206,20 @@ std::ostream &operator<<(std::ostream &in, const ShardMap &shard_map) { return in; } -Shards ShardMap::GetShards(const LabelName &label) { +Shards ShardMap::GetShardsForLabel(const LabelName &label) const { const auto id = labels.at(label); - auto &shards = label_spaces.at(id).shards; + const auto &shards = label_spaces.at(id).shards; return shards; } +std::vector ShardMap::GetAllShards() const { + std::vector all_shards; + all_shards.reserve(label_spaces.size()); + std::transform(label_spaces.begin(), label_spaces.end(), std::back_inserter(all_shards), + [](const auto &label_space) { return label_space.second.shards; }); + return all_shards; +} + // TODO(gabor) later we will want to update the wallclock time with // the given Io's time as well Hlc ShardMap::IncrementShardMapVersion() noexcept { @@ -361,7 +369,7 @@ std::optional ShardMap::GetLabelId(const std::string &label) const { return std::nullopt; } -std::string ShardMap::GetLabelName(const LabelId label) const { +const std::string &ShardMap::GetLabelName(const LabelId label) const { if (const auto it = std::ranges::find_if(labels, [label](const auto &name_id_pair) { return name_id_pair.second == label; }); it != labels.end()) { @@ -378,7 +386,7 @@ std::optional ShardMap::GetPropertyId(const std::string &property_na return std::nullopt; } -std::string ShardMap::GetPropertyName(const PropertyId property) const { +const std::string &ShardMap::GetPropertyName(const PropertyId property) const { if (const auto it = std::ranges::find_if( properties, [property](const auto &name_id_pair) { return name_id_pair.second == property; }); it != properties.end()) { @@ -395,7 +403,7 @@ std::optional ShardMap::GetEdgeTypeId(const std::string &edge_type) return std::nullopt; } -std::string ShardMap::GetEdgeTypeName(const EdgeTypeId property) const { +const std::string &ShardMap::GetEdgeTypeName(const EdgeTypeId property) const { if (const auto it = std::ranges::find_if( edge_types, [property](const auto &name_id_pair) { return name_id_pair.second == property; }); it != edge_types.end()) { diff --git a/src/coordinator/shard_map.hpp b/src/coordinator/shard_map.hpp index b637e2300..63274aa76 100644 --- a/src/coordinator/shard_map.hpp +++ b/src/coordinator/shard_map.hpp @@ -127,7 +127,9 @@ struct ShardMap { [[nodiscard]] static ShardMap Parse(std::istream &input_stream); friend std::ostream &operator<<(std::ostream &in, const ShardMap &shard_map); - Shards GetShards(const LabelName &label); + Shards GetShardsForLabel(const LabelName &label) const; + + std::vector GetAllShards() const; // TODO(gabor) later we will want to update the wallclock time with // the given Io's time as well @@ -146,11 +148,11 @@ struct ShardMap { std::optional GetLabelId(const std::string &label) const; // TODO(antaljanosbenjamin): Remove this and instead use NameIdMapper - std::string GetLabelName(LabelId label) const; + const std::string &GetLabelName(LabelId label) const; std::optional GetPropertyId(const std::string &property_name) const; - std::string GetPropertyName(PropertyId property) const; + const std::string &GetPropertyName(PropertyId property) const; std::optional GetEdgeTypeId(const std::string &edge_type) const; - std::string GetEdgeTypeName(EdgeTypeId property) const; + const std::string &GetEdgeTypeName(EdgeTypeId property) const; Shards GetShardsForRange(const LabelName &label_name, const PrimaryKey &start_key, const PrimaryKey &end_key) const; diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp index 3c36559c9..4243c6bd1 100644 --- a/src/query/v2/plan/operator.cpp +++ b/src/query/v2/plan/operator.cpp @@ -381,6 +381,8 @@ class DistributedScanAllAndFilterCursor : public Cursor { } } + request_state_.label = label_.has_value() ? std::make_optional(shard_manager.LabelToName(*label_)) : std::nullopt; + if (current_vertex_it == current_batch.end()) { if (request_state_.state == State::COMPLETED || !MakeRequest(shard_manager)) { ResetExecutionState(); @@ -399,7 +401,6 @@ class DistributedScanAllAndFilterCursor : public Cursor { current_batch.clear(); current_vertex_it = current_batch.end(); request_state_ = msgs::ExecutionState{}; - request_state_.label = "label"; } void Reset() override { diff --git a/src/query/v2/plan/vertex_count_cache.hpp b/src/query/v2/plan/vertex_count_cache.hpp index f1be8e1a1..a7bfbdf85 100644 --- a/src/query/v2/plan/vertex_count_cache.hpp +++ b/src/query/v2/plan/vertex_count_cache.hpp @@ -52,7 +52,8 @@ class VertexCountCache { return 1; } - bool LabelIndexExists(storage::v3::LabelId /*label*/) { return false; } + // For now return true if label is primary label + bool LabelIndexExists(storage::v3::LabelId label) { return shard_request_manager_->IsPrimaryLabel(label); } bool LabelPropertyIndexExists(storage::v3::LabelId /*label*/, storage::v3::PropertyId /*property*/) { return false; } diff --git a/src/query/v2/shard_request_manager.hpp b/src/query/v2/shard_request_manager.hpp index 4ee36ec4a..79794aa1a 100644 --- a/src/query/v2/shard_request_manager.hpp +++ b/src/query/v2/shard_request_manager.hpp @@ -129,6 +129,7 @@ class ShardRequestManagerInterface { virtual const std::string &PropertyToName(memgraph::storage::v3::PropertyId prop) const = 0; virtual const std::string &LabelToName(memgraph::storage::v3::LabelId label) const = 0; virtual const std::string &EdgeTypeToName(memgraph::storage::v3::EdgeTypeId type) const = 0; + virtual bool IsPrimaryLabel(LabelId label) const = 0; virtual bool IsPrimaryKey(LabelId primary_label, PropertyId property) const = 0; }; @@ -222,17 +223,14 @@ class ShardRequestManager : public ShardRequestManagerInterface { return shards_map_.GetLabelId(name).value(); } - const std::string &PropertyToName(memgraph::storage::v3::PropertyId /*prop*/) const override { - static std::string str{"dummy__prop"}; - return str; + const std::string &PropertyToName(memgraph::storage::v3::PropertyId prop) const override { + return shards_map_.GetPropertyName(prop); } - const std::string &LabelToName(memgraph::storage::v3::LabelId /*label*/) const override { - static std::string str{"dummy__label"}; - return str; + const std::string &LabelToName(memgraph::storage::v3::LabelId label) const override { + return shards_map_.GetLabelName(label); } - const std::string &EdgeTypeToName(memgraph::storage::v3::EdgeTypeId /*type*/) const override { - static std::string str{"dummy__edgetype"}; - return str; + const std::string &EdgeTypeToName(memgraph::storage::v3::EdgeTypeId type) const override { + return shards_map_.GetEdgeTypeName(type); } bool IsPrimaryKey(LabelId primary_label, PropertyId property) const override { @@ -244,9 +242,10 @@ class ShardRequestManager : public ShardRequestManagerInterface { }) != schema_it->second.end(); } + bool IsPrimaryLabel(LabelId label) const override { return shards_map_.label_spaces.contains(label); } + // TODO(kostasrim) Simplify return result std::vector Request(ExecutionState &state) override { - spdlog::info("shards_map_.size(): {}", shards_map_.GetShards(*state.label).size()); MaybeInitializeExecutionState(state); std::vector responses; @@ -455,15 +454,26 @@ class ShardRequestManager : public ShardRequestManagerInterface { if (ShallNotInitializeState(state)) { return; } + + std::vector multi_shards; state.transaction_id = transaction_id_; - auto shards = shards_map_.GetShards(*state.label); - for (auto &[key, shard] : shards) { - MG_ASSERT(!shard.empty()); - state.shard_cache.push_back(std::move(shard)); - ScanVerticesRequest rqst; - rqst.transaction_id = transaction_id_; - rqst.start_id.second = storage::conversions::ConvertValueVector(key); - state.requests.push_back(std::move(rqst)); + if (!state.label) { + multi_shards = shards_map_.GetAllShards(); + } else { + const auto label_id = shards_map_.GetLabelId(*state.label); + MG_ASSERT(label_id); + MG_ASSERT(IsPrimaryLabel(*label_id)); + multi_shards = {shards_map_.GetShardsForLabel(*state.label)}; + } + for (auto &shards : multi_shards) { + for (auto &[key, shard] : shards) { + MG_ASSERT(!shard.empty()); + state.shard_cache.push_back(std::move(shard)); + ScanVerticesRequest rqst; + rqst.transaction_id = transaction_id_; + rqst.start_id.second = storage::conversions::ConvertValueVector(key); + state.requests.push_back(std::move(rqst)); + } } state.state = ExecutionState::EXECUTING; } @@ -521,11 +531,15 @@ class ShardRequestManager : public ShardRequestManagerInterface { } void SendAllRequests(ExecutionState &state) { + int64_t shard_idx = 0; for (const auto &request : state.requests) { - auto &storage_client = - GetStorageClientForShard(*state.label, storage::conversions::ConvertPropertyVector(request.start_id.second)); + const auto ¤t_shard = state.shard_cache[shard_idx]; + + auto &storage_client = GetStorageClientForShard(current_shard); ReadRequests req = request; storage_client.SendAsyncReadRequest(request); + + ++shard_idx; } } @@ -647,8 +661,8 @@ class ShardRequestManager : public ShardRequestManagerInterface { continue; } - auto &storage_client = GetStorageClientForShard( - *state.label, storage::conversions::ConvertPropertyVector(state.requests[request_idx].start_id.second)); + auto &storage_client = GetStorageClientForShard(*shard_it); + auto await_result = storage_client.AwaitAsyncReadRequest(); if (!await_result) {