Merge branch 'project-pineapples' into T1083-MG-limit-and-order-expand-one_v3
This commit is contained in:
commit
8e7118efde
@ -206,12 +206,20 @@ std::ostream &operator<<(std::ostream &in, const ShardMap &shard_map) {
|
||||
return in;
|
||||
}
|
||||
|
||||
Shards ShardMap::GetShards(const LabelName &label) {
|
||||
Shards ShardMap::GetShardsForLabel(const LabelName &label) const {
|
||||
const auto id = labels.at(label);
|
||||
auto &shards = label_spaces.at(id).shards;
|
||||
const auto &shards = label_spaces.at(id).shards;
|
||||
return shards;
|
||||
}
|
||||
|
||||
std::vector<Shards> ShardMap::GetAllShards() const {
|
||||
std::vector<Shards> all_shards;
|
||||
all_shards.reserve(label_spaces.size());
|
||||
std::transform(label_spaces.begin(), label_spaces.end(), std::back_inserter(all_shards),
|
||||
[](const auto &label_space) { return label_space.second.shards; });
|
||||
return all_shards;
|
||||
}
|
||||
|
||||
// TODO(gabor) later we will want to update the wallclock time with
|
||||
// the given Io<impl>'s time as well
|
||||
Hlc ShardMap::IncrementShardMapVersion() noexcept {
|
||||
@ -361,7 +369,7 @@ std::optional<LabelId> ShardMap::GetLabelId(const std::string &label) const {
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
std::string ShardMap::GetLabelName(const LabelId label) const {
|
||||
const std::string &ShardMap::GetLabelName(const LabelId label) const {
|
||||
if (const auto it =
|
||||
std::ranges::find_if(labels, [label](const auto &name_id_pair) { return name_id_pair.second == label; });
|
||||
it != labels.end()) {
|
||||
@ -378,7 +386,7 @@ std::optional<PropertyId> ShardMap::GetPropertyId(const std::string &property_na
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
std::string ShardMap::GetPropertyName(const PropertyId property) const {
|
||||
const std::string &ShardMap::GetPropertyName(const PropertyId property) const {
|
||||
if (const auto it = std::ranges::find_if(
|
||||
properties, [property](const auto &name_id_pair) { return name_id_pair.second == property; });
|
||||
it != properties.end()) {
|
||||
@ -395,7 +403,7 @@ std::optional<EdgeTypeId> ShardMap::GetEdgeTypeId(const std::string &edge_type)
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
std::string ShardMap::GetEdgeTypeName(const EdgeTypeId property) const {
|
||||
const std::string &ShardMap::GetEdgeTypeName(const EdgeTypeId property) const {
|
||||
if (const auto it = std::ranges::find_if(
|
||||
edge_types, [property](const auto &name_id_pair) { return name_id_pair.second == property; });
|
||||
it != edge_types.end()) {
|
||||
|
@ -127,7 +127,9 @@ struct ShardMap {
|
||||
[[nodiscard]] static ShardMap Parse(std::istream &input_stream);
|
||||
friend std::ostream &operator<<(std::ostream &in, const ShardMap &shard_map);
|
||||
|
||||
Shards GetShards(const LabelName &label);
|
||||
Shards GetShardsForLabel(const LabelName &label) const;
|
||||
|
||||
std::vector<Shards> GetAllShards() const;
|
||||
|
||||
// TODO(gabor) later we will want to update the wallclock time with
|
||||
// the given Io<impl>'s time as well
|
||||
@ -146,11 +148,11 @@ struct ShardMap {
|
||||
|
||||
std::optional<LabelId> GetLabelId(const std::string &label) const;
|
||||
// TODO(antaljanosbenjamin): Remove this and instead use NameIdMapper
|
||||
std::string GetLabelName(LabelId label) const;
|
||||
const std::string &GetLabelName(LabelId label) const;
|
||||
std::optional<PropertyId> GetPropertyId(const std::string &property_name) const;
|
||||
std::string GetPropertyName(PropertyId property) const;
|
||||
const std::string &GetPropertyName(PropertyId property) const;
|
||||
std::optional<EdgeTypeId> GetEdgeTypeId(const std::string &edge_type) const;
|
||||
std::string GetEdgeTypeName(EdgeTypeId property) const;
|
||||
const std::string &GetEdgeTypeName(EdgeTypeId property) const;
|
||||
|
||||
Shards GetShardsForRange(const LabelName &label_name, const PrimaryKey &start_key, const PrimaryKey &end_key) const;
|
||||
|
||||
|
@ -381,6 +381,8 @@ class DistributedScanAllAndFilterCursor : public Cursor {
|
||||
}
|
||||
}
|
||||
|
||||
request_state_.label = label_.has_value() ? std::make_optional(shard_manager.LabelToName(*label_)) : std::nullopt;
|
||||
|
||||
if (current_vertex_it == current_batch.end()) {
|
||||
if (request_state_.state == State::COMPLETED || !MakeRequest(shard_manager)) {
|
||||
ResetExecutionState();
|
||||
@ -399,7 +401,6 @@ class DistributedScanAllAndFilterCursor : public Cursor {
|
||||
current_batch.clear();
|
||||
current_vertex_it = current_batch.end();
|
||||
request_state_ = msgs::ExecutionState<msgs::ScanVerticesRequest>{};
|
||||
request_state_.label = "label";
|
||||
}
|
||||
|
||||
void Reset() override {
|
||||
|
@ -52,7 +52,8 @@ class VertexCountCache {
|
||||
return 1;
|
||||
}
|
||||
|
||||
bool LabelIndexExists(storage::v3::LabelId /*label*/) { return false; }
|
||||
// For now return true if label is primary label
|
||||
bool LabelIndexExists(storage::v3::LabelId label) { return shard_request_manager_->IsPrimaryLabel(label); }
|
||||
|
||||
bool LabelPropertyIndexExists(storage::v3::LabelId /*label*/, storage::v3::PropertyId /*property*/) { return false; }
|
||||
|
||||
|
@ -129,6 +129,7 @@ class ShardRequestManagerInterface {
|
||||
virtual const std::string &PropertyToName(memgraph::storage::v3::PropertyId prop) const = 0;
|
||||
virtual const std::string &LabelToName(memgraph::storage::v3::LabelId label) const = 0;
|
||||
virtual const std::string &EdgeTypeToName(memgraph::storage::v3::EdgeTypeId type) const = 0;
|
||||
virtual bool IsPrimaryLabel(LabelId label) const = 0;
|
||||
virtual bool IsPrimaryKey(LabelId primary_label, PropertyId property) const = 0;
|
||||
};
|
||||
|
||||
@ -222,17 +223,14 @@ class ShardRequestManager : public ShardRequestManagerInterface {
|
||||
return shards_map_.GetLabelId(name).value();
|
||||
}
|
||||
|
||||
const std::string &PropertyToName(memgraph::storage::v3::PropertyId /*prop*/) const override {
|
||||
static std::string str{"dummy__prop"};
|
||||
return str;
|
||||
const std::string &PropertyToName(memgraph::storage::v3::PropertyId prop) const override {
|
||||
return shards_map_.GetPropertyName(prop);
|
||||
}
|
||||
const std::string &LabelToName(memgraph::storage::v3::LabelId /*label*/) const override {
|
||||
static std::string str{"dummy__label"};
|
||||
return str;
|
||||
const std::string &LabelToName(memgraph::storage::v3::LabelId label) const override {
|
||||
return shards_map_.GetLabelName(label);
|
||||
}
|
||||
const std::string &EdgeTypeToName(memgraph::storage::v3::EdgeTypeId /*type*/) const override {
|
||||
static std::string str{"dummy__edgetype"};
|
||||
return str;
|
||||
const std::string &EdgeTypeToName(memgraph::storage::v3::EdgeTypeId type) const override {
|
||||
return shards_map_.GetEdgeTypeName(type);
|
||||
}
|
||||
|
||||
bool IsPrimaryKey(LabelId primary_label, PropertyId property) const override {
|
||||
@ -244,9 +242,10 @@ class ShardRequestManager : public ShardRequestManagerInterface {
|
||||
}) != schema_it->second.end();
|
||||
}
|
||||
|
||||
bool IsPrimaryLabel(LabelId label) const override { return shards_map_.label_spaces.contains(label); }
|
||||
|
||||
// TODO(kostasrim) Simplify return result
|
||||
std::vector<VertexAccessor> Request(ExecutionState<ScanVerticesRequest> &state) override {
|
||||
spdlog::info("shards_map_.size(): {}", shards_map_.GetShards(*state.label).size());
|
||||
MaybeInitializeExecutionState(state);
|
||||
std::vector<ScanVerticesResponse> responses;
|
||||
|
||||
@ -455,15 +454,26 @@ class ShardRequestManager : public ShardRequestManagerInterface {
|
||||
if (ShallNotInitializeState(state)) {
|
||||
return;
|
||||
}
|
||||
|
||||
std::vector<coordinator::Shards> multi_shards;
|
||||
state.transaction_id = transaction_id_;
|
||||
auto shards = shards_map_.GetShards(*state.label);
|
||||
for (auto &[key, shard] : shards) {
|
||||
MG_ASSERT(!shard.empty());
|
||||
state.shard_cache.push_back(std::move(shard));
|
||||
ScanVerticesRequest rqst;
|
||||
rqst.transaction_id = transaction_id_;
|
||||
rqst.start_id.second = storage::conversions::ConvertValueVector(key);
|
||||
state.requests.push_back(std::move(rqst));
|
||||
if (!state.label) {
|
||||
multi_shards = shards_map_.GetAllShards();
|
||||
} else {
|
||||
const auto label_id = shards_map_.GetLabelId(*state.label);
|
||||
MG_ASSERT(label_id);
|
||||
MG_ASSERT(IsPrimaryLabel(*label_id));
|
||||
multi_shards = {shards_map_.GetShardsForLabel(*state.label)};
|
||||
}
|
||||
for (auto &shards : multi_shards) {
|
||||
for (auto &[key, shard] : shards) {
|
||||
MG_ASSERT(!shard.empty());
|
||||
state.shard_cache.push_back(std::move(shard));
|
||||
ScanVerticesRequest rqst;
|
||||
rqst.transaction_id = transaction_id_;
|
||||
rqst.start_id.second = storage::conversions::ConvertValueVector(key);
|
||||
state.requests.push_back(std::move(rqst));
|
||||
}
|
||||
}
|
||||
state.state = ExecutionState<ScanVerticesRequest>::EXECUTING;
|
||||
}
|
||||
@ -521,11 +531,15 @@ class ShardRequestManager : public ShardRequestManagerInterface {
|
||||
}
|
||||
|
||||
void SendAllRequests(ExecutionState<ScanVerticesRequest> &state) {
|
||||
int64_t shard_idx = 0;
|
||||
for (const auto &request : state.requests) {
|
||||
auto &storage_client =
|
||||
GetStorageClientForShard(*state.label, storage::conversions::ConvertPropertyVector(request.start_id.second));
|
||||
const auto ¤t_shard = state.shard_cache[shard_idx];
|
||||
|
||||
auto &storage_client = GetStorageClientForShard(current_shard);
|
||||
ReadRequests req = request;
|
||||
storage_client.SendAsyncReadRequest(request);
|
||||
|
||||
++shard_idx;
|
||||
}
|
||||
}
|
||||
|
||||
@ -647,8 +661,8 @@ class ShardRequestManager : public ShardRequestManagerInterface {
|
||||
continue;
|
||||
}
|
||||
|
||||
auto &storage_client = GetStorageClientForShard(
|
||||
*state.label, storage::conversions::ConvertPropertyVector(state.requests[request_idx].start_id.second));
|
||||
auto &storage_client = GetStorageClientForShard(*shard_it);
|
||||
|
||||
auto await_result = storage_client.AwaitAsyncReadRequest();
|
||||
|
||||
if (!await_result) {
|
||||
|
Loading…
Reference in New Issue
Block a user