Make primary labels act as label indices (#605)
Because of the lexicographical sharding, the primary labels themselves are acting as indexes. If a primary label is specified in a MATCH query we can safely narrow the range of shards we have to scan through based on that label. This PR introduces the necessary changes in order to achieve that.
This commit is contained in:
parent
332afadf21
commit
ca2351124b
src
coordinator
query/v2
@ -206,12 +206,20 @@ std::ostream &operator<<(std::ostream &in, const ShardMap &shard_map) {
|
||||
return in;
|
||||
}
|
||||
|
||||
Shards ShardMap::GetShards(const LabelName &label) {
|
||||
Shards ShardMap::GetShardsForLabel(const LabelName &label) const {
|
||||
const auto id = labels.at(label);
|
||||
auto &shards = label_spaces.at(id).shards;
|
||||
const auto &shards = label_spaces.at(id).shards;
|
||||
return shards;
|
||||
}
|
||||
|
||||
std::vector<Shards> ShardMap::GetAllShards() const {
|
||||
std::vector<Shards> all_shards;
|
||||
all_shards.reserve(label_spaces.size());
|
||||
std::transform(label_spaces.begin(), label_spaces.end(), std::back_inserter(all_shards),
|
||||
[](const auto &label_space) { return label_space.second.shards; });
|
||||
return all_shards;
|
||||
}
|
||||
|
||||
// TODO(gabor) later we will want to update the wallclock time with
|
||||
// the given Io<impl>'s time as well
|
||||
Hlc ShardMap::IncrementShardMapVersion() noexcept {
|
||||
@ -361,7 +369,7 @@ std::optional<LabelId> ShardMap::GetLabelId(const std::string &label) const {
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
std::string ShardMap::GetLabelName(const LabelId label) const {
|
||||
const std::string &ShardMap::GetLabelName(const LabelId label) const {
|
||||
if (const auto it =
|
||||
std::ranges::find_if(labels, [label](const auto &name_id_pair) { return name_id_pair.second == label; });
|
||||
it != labels.end()) {
|
||||
@ -378,7 +386,7 @@ std::optional<PropertyId> ShardMap::GetPropertyId(const std::string &property_na
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
std::string ShardMap::GetPropertyName(const PropertyId property) const {
|
||||
const std::string &ShardMap::GetPropertyName(const PropertyId property) const {
|
||||
if (const auto it = std::ranges::find_if(
|
||||
properties, [property](const auto &name_id_pair) { return name_id_pair.second == property; });
|
||||
it != properties.end()) {
|
||||
@ -395,7 +403,7 @@ std::optional<EdgeTypeId> ShardMap::GetEdgeTypeId(const std::string &edge_type)
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
std::string ShardMap::GetEdgeTypeName(const EdgeTypeId property) const {
|
||||
const std::string &ShardMap::GetEdgeTypeName(const EdgeTypeId property) const {
|
||||
if (const auto it = std::ranges::find_if(
|
||||
edge_types, [property](const auto &name_id_pair) { return name_id_pair.second == property; });
|
||||
it != edge_types.end()) {
|
||||
|
@ -127,7 +127,9 @@ struct ShardMap {
|
||||
[[nodiscard]] static ShardMap Parse(std::istream &input_stream);
|
||||
friend std::ostream &operator<<(std::ostream &in, const ShardMap &shard_map);
|
||||
|
||||
Shards GetShards(const LabelName &label);
|
||||
Shards GetShardsForLabel(const LabelName &label) const;
|
||||
|
||||
std::vector<Shards> GetAllShards() const;
|
||||
|
||||
// TODO(gabor) later we will want to update the wallclock time with
|
||||
// the given Io<impl>'s time as well
|
||||
@ -146,11 +148,11 @@ struct ShardMap {
|
||||
|
||||
std::optional<LabelId> GetLabelId(const std::string &label) const;
|
||||
// TODO(antaljanosbenjamin): Remove this and instead use NameIdMapper
|
||||
std::string GetLabelName(LabelId label) const;
|
||||
const std::string &GetLabelName(LabelId label) const;
|
||||
std::optional<PropertyId> GetPropertyId(const std::string &property_name) const;
|
||||
std::string GetPropertyName(PropertyId property) const;
|
||||
const std::string &GetPropertyName(PropertyId property) const;
|
||||
std::optional<EdgeTypeId> GetEdgeTypeId(const std::string &edge_type) const;
|
||||
std::string GetEdgeTypeName(EdgeTypeId property) const;
|
||||
const std::string &GetEdgeTypeName(EdgeTypeId property) const;
|
||||
|
||||
Shards GetShardsForRange(const LabelName &label_name, const PrimaryKey &start_key, const PrimaryKey &end_key) const;
|
||||
|
||||
|
@ -381,6 +381,8 @@ class DistributedScanAllAndFilterCursor : public Cursor {
|
||||
}
|
||||
}
|
||||
|
||||
request_state_.label = label_.has_value() ? std::make_optional(shard_manager.LabelToName(*label_)) : std::nullopt;
|
||||
|
||||
if (current_vertex_it == current_batch.end()) {
|
||||
if (request_state_.state == State::COMPLETED || !MakeRequest(shard_manager)) {
|
||||
ResetExecutionState();
|
||||
@ -399,7 +401,6 @@ class DistributedScanAllAndFilterCursor : public Cursor {
|
||||
current_batch.clear();
|
||||
current_vertex_it = current_batch.end();
|
||||
request_state_ = msgs::ExecutionState<msgs::ScanVerticesRequest>{};
|
||||
request_state_.label = "label";
|
||||
}
|
||||
|
||||
void Reset() override {
|
||||
|
@ -52,7 +52,8 @@ class VertexCountCache {
|
||||
return 1;
|
||||
}
|
||||
|
||||
bool LabelIndexExists(storage::v3::LabelId /*label*/) { return false; }
|
||||
// For now return true if label is primary label
|
||||
bool LabelIndexExists(storage::v3::LabelId label) { return shard_request_manager_->IsPrimaryLabel(label); }
|
||||
|
||||
bool LabelPropertyIndexExists(storage::v3::LabelId /*label*/, storage::v3::PropertyId /*property*/) { return false; }
|
||||
|
||||
|
@ -129,6 +129,7 @@ class ShardRequestManagerInterface {
|
||||
virtual const std::string &PropertyToName(memgraph::storage::v3::PropertyId prop) const = 0;
|
||||
virtual const std::string &LabelToName(memgraph::storage::v3::LabelId label) const = 0;
|
||||
virtual const std::string &EdgeTypeToName(memgraph::storage::v3::EdgeTypeId type) const = 0;
|
||||
virtual bool IsPrimaryLabel(LabelId label) const = 0;
|
||||
virtual bool IsPrimaryKey(LabelId primary_label, PropertyId property) const = 0;
|
||||
};
|
||||
|
||||
@ -222,17 +223,14 @@ class ShardRequestManager : public ShardRequestManagerInterface {
|
||||
return shards_map_.GetLabelId(name).value();
|
||||
}
|
||||
|
||||
const std::string &PropertyToName(memgraph::storage::v3::PropertyId /*prop*/) const override {
|
||||
static std::string str{"dummy__prop"};
|
||||
return str;
|
||||
const std::string &PropertyToName(memgraph::storage::v3::PropertyId prop) const override {
|
||||
return shards_map_.GetPropertyName(prop);
|
||||
}
|
||||
const std::string &LabelToName(memgraph::storage::v3::LabelId /*label*/) const override {
|
||||
static std::string str{"dummy__label"};
|
||||
return str;
|
||||
const std::string &LabelToName(memgraph::storage::v3::LabelId label) const override {
|
||||
return shards_map_.GetLabelName(label);
|
||||
}
|
||||
const std::string &EdgeTypeToName(memgraph::storage::v3::EdgeTypeId /*type*/) const override {
|
||||
static std::string str{"dummy__edgetype"};
|
||||
return str;
|
||||
const std::string &EdgeTypeToName(memgraph::storage::v3::EdgeTypeId type) const override {
|
||||
return shards_map_.GetEdgeTypeName(type);
|
||||
}
|
||||
|
||||
bool IsPrimaryKey(LabelId primary_label, PropertyId property) const override {
|
||||
@ -244,9 +242,10 @@ class ShardRequestManager : public ShardRequestManagerInterface {
|
||||
}) != schema_it->second.end();
|
||||
}
|
||||
|
||||
bool IsPrimaryLabel(LabelId label) const override { return shards_map_.label_spaces.contains(label); }
|
||||
|
||||
// TODO(kostasrim) Simplify return result
|
||||
std::vector<VertexAccessor> Request(ExecutionState<ScanVerticesRequest> &state) override {
|
||||
spdlog::info("shards_map_.size(): {}", shards_map_.GetShards(*state.label).size());
|
||||
MaybeInitializeExecutionState(state);
|
||||
std::vector<ScanVerticesResponse> responses;
|
||||
|
||||
@ -455,15 +454,26 @@ class ShardRequestManager : public ShardRequestManagerInterface {
|
||||
if (ShallNotInitializeState(state)) {
|
||||
return;
|
||||
}
|
||||
|
||||
std::vector<coordinator::Shards> multi_shards;
|
||||
state.transaction_id = transaction_id_;
|
||||
auto shards = shards_map_.GetShards(*state.label);
|
||||
for (auto &[key, shard] : shards) {
|
||||
MG_ASSERT(!shard.empty());
|
||||
state.shard_cache.push_back(std::move(shard));
|
||||
ScanVerticesRequest rqst;
|
||||
rqst.transaction_id = transaction_id_;
|
||||
rqst.start_id.second = storage::conversions::ConvertValueVector(key);
|
||||
state.requests.push_back(std::move(rqst));
|
||||
if (!state.label) {
|
||||
multi_shards = shards_map_.GetAllShards();
|
||||
} else {
|
||||
const auto label_id = shards_map_.GetLabelId(*state.label);
|
||||
MG_ASSERT(label_id);
|
||||
MG_ASSERT(IsPrimaryLabel(*label_id));
|
||||
multi_shards = {shards_map_.GetShardsForLabel(*state.label)};
|
||||
}
|
||||
for (auto &shards : multi_shards) {
|
||||
for (auto &[key, shard] : shards) {
|
||||
MG_ASSERT(!shard.empty());
|
||||
state.shard_cache.push_back(std::move(shard));
|
||||
ScanVerticesRequest rqst;
|
||||
rqst.transaction_id = transaction_id_;
|
||||
rqst.start_id.second = storage::conversions::ConvertValueVector(key);
|
||||
state.requests.push_back(std::move(rqst));
|
||||
}
|
||||
}
|
||||
state.state = ExecutionState<ScanVerticesRequest>::EXECUTING;
|
||||
}
|
||||
@ -521,11 +531,15 @@ class ShardRequestManager : public ShardRequestManagerInterface {
|
||||
}
|
||||
|
||||
void SendAllRequests(ExecutionState<ScanVerticesRequest> &state) {
|
||||
int64_t shard_idx = 0;
|
||||
for (const auto &request : state.requests) {
|
||||
auto &storage_client =
|
||||
GetStorageClientForShard(*state.label, storage::conversions::ConvertPropertyVector(request.start_id.second));
|
||||
const auto ¤t_shard = state.shard_cache[shard_idx];
|
||||
|
||||
auto &storage_client = GetStorageClientForShard(current_shard);
|
||||
ReadRequests req = request;
|
||||
storage_client.SendAsyncReadRequest(request);
|
||||
|
||||
++shard_idx;
|
||||
}
|
||||
}
|
||||
|
||||
@ -647,8 +661,8 @@ class ShardRequestManager : public ShardRequestManagerInterface {
|
||||
continue;
|
||||
}
|
||||
|
||||
auto &storage_client = GetStorageClientForShard(
|
||||
*state.label, storage::conversions::ConvertPropertyVector(state.requests[request_idx].start_id.second));
|
||||
auto &storage_client = GetStorageClientForShard(*shard_it);
|
||||
|
||||
auto await_result = storage_client.AwaitAsyncReadRequest();
|
||||
|
||||
if (!await_result) {
|
||||
|
Loading…
Reference in New Issue
Block a user