React to PR comments

This commit is contained in:
gvolfing 2023-01-20 08:08:45 +01:00
parent 5eea3ceee4
commit cf76e0e19b
10 changed files with 133 additions and 199 deletions

View File

@ -200,10 +200,10 @@ class ValidFramesConsumer {
explicit ValidFramesConsumer(MultiFrame &multiframe);
~ValidFramesConsumer() noexcept;
ValidFramesConsumer(const ValidFramesConsumer &other) = default;
ValidFramesConsumer(ValidFramesConsumer &&other) noexcept = default;
ValidFramesConsumer &operator=(const ValidFramesConsumer &other) = default;
ValidFramesConsumer &operator=(ValidFramesConsumer &&other) noexcept = default;
ValidFramesConsumer(const ValidFramesConsumer &other) = delete;
ValidFramesConsumer(ValidFramesConsumer &&other) noexcept = delete;
ValidFramesConsumer &operator=(const ValidFramesConsumer &other) = delete;
ValidFramesConsumer &operator=(ValidFramesConsumer &&other) noexcept = delete;
struct Iterator {
using iterator_category = std::forward_iterator_tag;

View File

@ -465,194 +465,86 @@ class DistributedScanAllAndFilterCursor : public Cursor {
class DistributedScanAllByPrimaryKeyCursor : public Cursor {
public:
explicit DistributedScanAllByPrimaryKeyCursor(
Symbol output_symbol, UniqueCursorPtr input_cursor, const char *op_name,
std::optional<storage::v3::LabelId> label,
std::optional<std::pair<storage::v3::PropertyId, Expression *>> property_expression_pair,
std::optional<std::vector<Expression *>> filter_expressions, std::optional<std::vector<Expression *>> primary_key)
explicit DistributedScanAllByPrimaryKeyCursor(Symbol output_symbol, UniqueCursorPtr input_cursor, const char *op_name,
storage::v3::LabelId label,
std::optional<std::vector<Expression *>> filter_expressions,
std::vector<Expression *> primary_key)
: output_symbol_(output_symbol),
input_cursor_(std::move(input_cursor)),
op_name_(op_name),
label_(label),
property_expression_pair_(property_expression_pair),
filter_expressions_(filter_expressions),
primary_key_(primary_key) {
ResetExecutionState();
}
primary_key_(primary_key) {}
enum class State : int8_t { INITIALIZING, COMPLETED };
using VertexAccessor = accessors::VertexAccessor;
bool MakeRequest(RequestRouterInterface &request_router, ExecutionContext &context) {
{
SCOPED_REQUEST_WAIT_PROFILE;
std::optional<std::string> request_label;
if (label_.has_value()) {
request_label = request_router.LabelToName(*label_);
}
current_batch_ = request_router.ScanVertices(request_label);
std::optional<VertexAccessor> MakeRequestSingleFrame(Frame &frame, RequestRouterInterface &request_router,
ExecutionContext &context) {
// Evaluate the expressions that hold the PrimaryKey.
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router,
storage::v3::View::NEW);
std::vector<msgs::Value> pk;
for (auto *primary_property : primary_key_) {
pk.push_back(TypedValueToValue(primary_property->Accept(evaluator)));
}
current_vertex_it_ = current_batch_.begin();
request_state_ = State::COMPLETED;
return !current_batch_.empty();
}
bool MakeRequestSingleFrame(Frame &frame, RequestRouterInterface &request_router, ExecutionContext &context) {
{
msgs::Label label = {.id = msgs::LabelId::FromUint(label_.AsUint())};
msgs::GetPropertiesRequest req = {.vertex_ids = {std::make_pair(label, pk)}};
auto get_prop_result = std::invoke([&context, &request_router, &req]() mutable {
SCOPED_REQUEST_WAIT_PROFILE;
std::optional<std::string> request_label = std::nullopt;
if (label_.has_value()) {
request_label = request_router.LabelToName(*label_);
}
return request_router.GetProperties(req);
});
MG_ASSERT(get_prop_result.size() <= 1);
// Evaluate the expressions that hold the PrimaryKey.
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router,
storage::v3::View::NEW);
std::vector<msgs::Value> pk;
MG_ASSERT(primary_key_);
for (auto *primary_property : *primary_key_) {
pk.push_back(TypedValueToValue(primary_key->Accept(evaluator)));
}
msgs::Label label = {.id = msgs::LabelId::FromUint(label_->AsUint())};
msgs::GetPropertiesRequest req = {.vertex_ids = {std::make_pair(label, pk)}};
auto get_prop_result = request_router.GetProperties(req);
MG_ASSERT(get_prop_result.size() <= 1);
if (get_prop_result.empty()) {
current_batch_ = std::vector<VertexAccessor>{};
} else {
auto properties = get_prop_result[0].props;
// TODO (gvolfing) figure out labels when relevant.
msgs::Vertex vertex = {.id = get_prop_result[0].vertex, .labels = {}};
current_batch_ = {VertexAccessor(vertex, properties, &request_router)};
}
if (get_prop_result.empty()) {
return std::nullopt;
}
current_vertex_it_ = current_batch_.begin();
request_state_ = State::COMPLETED;
return !current_batch_.empty();
auto properties = get_prop_result[0].props;
// TODO (gvolfing) figure out labels when relevant.
msgs::Vertex vertex = {.id = get_prop_result[0].vertex, .labels = {}};
return VertexAccessor(vertex, properties, &request_router);
}
bool Pull(Frame &frame, ExecutionContext &context) override {
SCOPED_PROFILE_OP(op_name_);
if (MustAbort(context)) {
throw HintedAbortError();
}
if (!input_cursor_->Pull(frame, context)) {
return false;
}
auto &request_router = *context.request_router;
while (true) {
if (MustAbort(context)) {
throw HintedAbortError();
}
if (request_state_ == State::INITIALIZING) {
if (!input_cursor_->Pull(frame, context)) {
return false;
}
}
if (current_vertex_it_ == current_batch_.end() &&
(request_state_ == State::COMPLETED || !MakeRequestSingleFrame(frame, request_router, context))) {
ResetExecutionState();
continue;
}
frame[output_symbol_] = TypedValue(std::move(*current_vertex_it_));
++current_vertex_it_;
auto vertex = MakeRequestSingleFrame(frame, request_router, context);
if (vertex) {
frame[output_symbol_] = TypedValue(std::move(*vertex));
return true;
}
}
void PrepareNextFrames(ExecutionContext &context) {
auto &request_router = *context.request_router;
input_cursor_->PullMultiple(*own_multi_frames_, context);
valid_frames_consumer_ = own_multi_frames_->GetValidFramesConsumer();
valid_frames_it_ = valid_frames_consumer_->begin();
MakeRequest(request_router, context);
}
inline bool HasNextFrame() {
return current_vertex_it_ != current_batch_.end() && valid_frames_it_ != valid_frames_consumer_->end();
}
FrameWithValidity GetNextFrame(ExecutionContext &context) {
MG_ASSERT(HasNextFrame());
auto frame = *valid_frames_it_;
frame[output_symbol_] = TypedValue(*current_vertex_it_);
++current_vertex_it_;
if (current_vertex_it_ == current_batch_.end()) {
valid_frames_it_->MakeInvalid();
++valid_frames_it_;
if (valid_frames_it_ == valid_frames_consumer_->end()) {
PrepareNextFrames(context);
} else {
current_vertex_it_ = current_batch_.begin();
}
};
return frame;
return false;
}
void PullMultiple(MultiFrame &input_multi_frame, ExecutionContext &context) override {
SCOPED_PROFILE_OP(op_name_);
if (!own_multi_frames_.has_value()) {
own_multi_frames_.emplace(MultiFrame(input_multi_frame.GetFirstFrame().elems().size(),
kNumberOfFramesInMultiframe, input_multi_frame.GetMemoryResource()));
PrepareNextFrames(context);
}
while (true) {
if (MustAbort(context)) {
throw HintedAbortError();
}
auto invalid_frames_populator = input_multi_frame.GetInvalidFramesPopulator();
auto invalid_frame_it = invalid_frames_populator.begin();
auto has_modified_at_least_one_frame = false;
while (invalid_frames_populator.end() != invalid_frame_it && HasNextFrame()) {
has_modified_at_least_one_frame = true;
*invalid_frame_it = GetNextFrame(context);
++invalid_frame_it;
}
if (!has_modified_at_least_one_frame) {
return;
}
}
throw utils::NotYetImplemented("Multiframe version of ScanAllByPrimaryKey is yet to be implemented.");
};
void Reset() override { input_cursor_->Reset(); }
void Shutdown() override { input_cursor_->Shutdown(); }
void ResetExecutionState() {
current_batch_.clear();
current_vertex_it_ = current_batch_.end();
request_state_ = State::INITIALIZING;
}
void Reset() override {
input_cursor_->Reset();
ResetExecutionState();
}
private:
const Symbol output_symbol_;
const UniqueCursorPtr input_cursor_;
const char *op_name_;
std::vector<VertexAccessor> current_batch_;
std::vector<VertexAccessor>::iterator current_vertex_it_;
State request_state_ = State::INITIALIZING;
std::optional<storage::v3::LabelId> label_;
std::optional<std::pair<storage::v3::PropertyId, Expression *>> property_expression_pair_;
storage::v3::LabelId label_;
std::optional<std::vector<Expression *>> filter_expressions_;
std::optional<std::vector<Expression *>> primary_key_;
std::vector<Expression *> primary_key_;
std::optional<MultiFrame> own_multi_frames_;
std::optional<ValidFramesConsumer> valid_frames_consumer_;
ValidFramesConsumer::Iterator valid_frames_it_;
@ -767,9 +659,9 @@ ACCEPT_WITH_INPUT(ScanAllByPrimaryKey)
UniqueCursorPtr ScanAllByPrimaryKey::MakeCursor(utils::MemoryResource *mem) const {
EventCounter::IncrementCounter(EventCounter::ScanAllByPrimaryKeyOperator);
return MakeUniqueCursorPtr<DistributedScanAllByPrimaryKeyCursor>(
mem, output_symbol_, input_->MakeCursor(mem), "ScanAllByPrimaryKey", label_,
std::nullopt /*property_expression_pair*/, std::nullopt /*filter_expressions*/, primary_key_);
return MakeUniqueCursorPtr<DistributedScanAllByPrimaryKeyCursor>(mem, output_symbol_, input_->MakeCursor(mem),
"ScanAllByPrimaryKey", label_,
std::nullopt /*filter_expressions*/, primary_key_);
}
Expand::Expand(const std::shared_ptr<LogicalOperator> &input, Symbol input_symbol, Symbol node_symbol,

View File

@ -67,7 +67,7 @@ class PlanPrinter : public virtual HierarchicalLogicalOperatorVisitor {
bool PreVisit(ScanAllByLabelPropertyValue &) override;
bool PreVisit(ScanAllByLabelPropertyRange &) override;
bool PreVisit(ScanAllByLabelProperty &) override;
bool PreVisit(ScanAllByPrimaryKey & /*unused*/) override;
bool PreVisit(ScanAllByPrimaryKey &) override;
bool PreVisit(Expand &) override;
bool PreVisit(ExpandVariable &) override;

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -35,6 +35,7 @@ PRE_VISIT(ScanAllByLabel, RWType::R, true)
PRE_VISIT(ScanAllByLabelPropertyRange, RWType::R, true)
PRE_VISIT(ScanAllByLabelPropertyValue, RWType::R, true)
PRE_VISIT(ScanAllByLabelProperty, RWType::R, true)
PRE_VISIT(ScanAllByPrimaryKey, RWType::R, true)
PRE_VISIT(Expand, RWType::R, true)
PRE_VISIT(ExpandVariable, RWType::R, true)

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -59,6 +59,7 @@ class ReadWriteTypeChecker : public virtual HierarchicalLogicalOperatorVisitor {
bool PreVisit(ScanAllByLabelPropertyValue &) override;
bool PreVisit(ScanAllByLabelPropertyRange &) override;
bool PreVisit(ScanAllByLabelProperty &) override;
bool PreVisit(ScanAllByPrimaryKey &) override;
bool PreVisit(Expand &) override;
bool PreVisit(ExpandVariable &) override;

View File

@ -569,8 +569,8 @@ class IndexLookupRewriter final : public HierarchicalLogicalOperatorVisitor {
const auto &modified_symbols = scan.ModifiedSymbols(*symbol_table_);
std::unordered_set<Symbol> bound_symbols(modified_symbols.begin(), modified_symbols.end());
// Try to see if we can use label+property index. If not, try to use
// just the label index.
// Try to see if we can use label + primary-key or label + property index.
// If not, try to use just the label index.
const auto labels = filters_.FilteredLabels(node_symbol);
if (labels.empty()) {
// Without labels, we cannot generate any indexed ScanAll.
@ -583,19 +583,57 @@ class IndexLookupRewriter final : public HierarchicalLogicalOperatorVisitor {
query::v2::LabelIx prim_label;
std::vector<std::pair<query::v2::Expression *, query::v2::plan::FilterInfo>> primary_key;
auto extract_primary_key = [this](storage::v3::LabelId label,
std::vector<query::v2::plan::FilterInfo> property_filters)
-> std::vector<std::pair<query::v2::Expression *, query::v2::plan::FilterInfo>> {
std::vector<std::pair<query::v2::Expression *, query::v2::plan::FilterInfo>> pk_temp;
std::vector<std::pair<query::v2::Expression *, query::v2::plan::FilterInfo>> pk;
std::vector<memgraph::storage::v3::SchemaProperty> schema = db_->GetSchemaForLabel(label);
std::vector<storage::v3::PropertyId> schema_properties;
schema_properties.reserve(schema.size());
std::transform(schema.begin(), schema.end(), std::back_inserter(schema_properties),
[](const auto &schema_elem) { return schema_elem.property_id; });
for (const auto &property_filter : property_filters) {
const auto &property_id = db_->NameToProperty(property_filter.property_filter->property_.name);
if (std::find(schema_properties.begin(), schema_properties.end(), property_id) != schema_properties.end()) {
pk_temp.emplace_back(std::make_pair(property_filter.expression, property_filter));
}
}
// Make sure pk is in the same order as schema_properties.
for (const auto &schema_prop : schema_properties) {
for (auto &pk_temp_prop : pk_temp) {
const auto &property_id = db_->NameToProperty(pk_temp_prop.second.property_filter->property_.name);
if (schema_prop == property_id) {
pk.push_back(pk_temp_prop);
}
}
}
MG_ASSERT(pk.size() == pk_temp.size(),
"The two vectors should represent the same primary key with a possibly different order of contained "
"elements.");
return pk.size() == schema_properties.size()
? pk
: std::vector<std::pair<query::v2::Expression *, query::v2::plan::FilterInfo>>{};
};
if (!property_filters.empty()) {
for (const auto &label : labels) {
if (db_->LabelIndexExists(GetLabel(label))) {
if (db_->PrimaryLabelExists(GetLabel(label))) {
prim_label = label;
primary_key = db_->ExtractPrimaryKey(GetLabel(prim_label), property_filters);
primary_key = extract_primary_key(GetLabel(prim_label), property_filters);
break;
}
}
if (!primary_key.empty()) {
// Mark the expressions so they won't be used for an additional, unnecessary filter.
for (const auto &primary_property : primary_key) {
filter_exprs_for_removal_.insert(pk.first);
filters_.EraseFilter(pk.second);
filter_exprs_for_removal_.insert(primary_property.first);
filters_.EraseFilter(primary_property.second);
}
EraseLabelFilters(node_symbol, prim_label);
std::vector<query::v2::Expression *> pk_expressions;

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -22,6 +22,7 @@
#include "storage/v3/id_types.hpp"
#include "storage/v3/property_value.hpp"
#include "utils/bound.hpp"
#include "utils/exceptions.hpp"
#include "utils/fnv.hpp"
namespace memgraph::query::v2::plan {
@ -54,31 +55,16 @@ class VertexCountCache {
return 1;
}
bool LabelIndexExists(storage::v3::LabelId label) { return request_router_->IsPrimaryLabel(label); }
bool LabelIndexExists(storage::v3::LabelId label) {
throw utils::NotYetImplemented("Label indicies are yet to be implemented.");
}
bool PrimaryLabelExists(storage::v3::LabelId label) { return request_router_->IsPrimaryLabel(label); }
bool LabelPropertyIndexExists(storage::v3::LabelId /*label*/, storage::v3::PropertyId /*property*/) { return false; }
std::vector<std::pair<query::v2::Expression *, query::v2::plan::FilterInfo>> ExtractPrimaryKey(
storage::v3::LabelId label, std::vector<query::v2::plan::FilterInfo> property_filters) {
std::vector<std::pair<query::v2::Expression *, query::v2::plan::FilterInfo>> pk;
const auto schema = request_router_->GetSchemaForLabel(label);
std::vector<storage::v3::PropertyId> schema_properties;
schema_properties.reserve(schema.size());
std::transform(schema.begin(), schema.end(), std::back_inserter(schema_properties),
[](const auto &schema_elem) { return schema_elem.property_id; });
for (const auto &property_filter : property_filters) {
const auto &property_id = NameToProperty(property_filter.property_filter->property_.name);
if (std::find(schema_properties.begin(), schema_properties.end(), property_id) != schema_properties.end()) {
pk.emplace_back(std::make_pair(property_filter.expression, property_filter));
}
}
return pk.size() == schema_properties.size()
? pk
: std::vector<std::pair<query::v2::Expression *, query::v2::plan::FilterInfo>>{};
std::vector<memgraph::storage::v3::SchemaProperty> GetSchemaForLabel(storage::v3::LabelId label) {
return request_router_->GetSchemaForLabel(label);
}
RequestRouterInterface *request_router_;

View File

@ -539,12 +539,12 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::GetPropertiesRequest &&req) {
const VertexAccessor &v_acc,
const std::optional<EdgeAccessor> &e_acc) -> ShardResult<std::map<PropertyId, Value>> {
if (!req.property_ids) {
const auto *schema = shard_->GetSchema(shard_->PrimaryLabel());
MG_ASSERT(schema);
if (e_acc) {
return CollectAllPropertiesFromAccessor(*e_acc, view);
}
const auto *schema = shard_->GetSchema(shard_->PrimaryLabel());
MG_ASSERT(schema);
return CollectAllPropertiesFromAccessor(v_acc, view, *schema);
}

View File

@ -486,7 +486,6 @@ auto GetForeach(AstStorage &storage, NamedExpression *named_expr, const std::vec
#define EDGE(...) memgraph::query::test_common::GetEdge(storage, __VA_ARGS__)
#define EDGE_VARIABLE(...) memgraph::query::test_common::GetEdgeVariable(storage, __VA_ARGS__)
#define PATTERN(...) memgraph::query::test_common::GetPattern(storage, {__VA_ARGS__})
#define PATTERN(...) memgraph::query::test_common::GetPattern(storage, {__VA_ARGS__})
#define NAMED_PATTERN(name, ...) memgraph::query::test_common::GetPattern(storage, name, {__VA_ARGS__})
#define OPTIONAL_MATCH(...) \
memgraph::query::test_common::GetWithPatterns(storage.Create<memgraph::query::Match>(true), {__VA_ARGS__})

View File

@ -17,6 +17,7 @@
#include "query/v2/plan/operator.hpp"
#include "query/v2/plan/planner.hpp"
#include "query/v2/plan/preprocess.hpp"
#include "utils/exceptions.hpp"
namespace memgraph::query::v2::plan {
@ -289,7 +290,7 @@ class FakeDistributedDbAccessor {
}
bool LabelIndexExists(memgraph::storage::v3::LabelId label) const {
return label_index_.find(label) != label_index_.end();
throw utils::NotYetImplemented("Label indicies are yet to be implemented.");
}
bool LabelPropertyIndexExists(memgraph::storage::v3::LabelId label,
@ -302,6 +303,8 @@ class FakeDistributedDbAccessor {
return false;
}
bool PrimaryLabelExists(storage::v3::LabelId label) { return label_index_.find(label) != label_index_.end(); }
void SetIndexCount(memgraph::storage::v3::LabelId label, int64_t count) { label_index_[label] = count; }
void SetIndexCount(memgraph::storage::v3::LabelId label, memgraph::storage::v3::PropertyId property, int64_t count) {
@ -382,13 +385,27 @@ class FakeDistributedDbAccessor {
return memgraph::storage::v3::PropertyId::FromUint(0);
}
std::vector<memgraph::storage::v3::SchemaProperty> GetSchemaForLabel(storage::v3::LabelId label) {
auto schema_properties = schemas_.at(label);
std::vector<memgraph::storage::v3::SchemaProperty> ret;
std::transform(schema_properties.begin(), schema_properties.end(), std::back_inserter(ret), [](const auto &prop) {
memgraph::storage::v3::SchemaProperty schema_prop = {
.property_id = prop,
// This should not be hardcoded, but for testing purposes it will suffice.
.type = memgraph::common::SchemaType::INT};
return schema_prop;
});
return ret;
}
std::vector<std::pair<query::v2::Expression *, query::v2::plan::FilterInfo>> ExtractPrimaryKey(
storage::v3::LabelId label, std::vector<query::v2::plan::FilterInfo> property_filters) {
MG_ASSERT(schemas_.contains(label),
"You did not specify the Schema for this label! Use FakeDistributedDbAccessor::CreateSchema(...).");
std::vector<std::pair<query::v2::Expression *, query::v2::plan::FilterInfo>> pk;
const auto schema = GetSchemaForLabel(label);
const auto schema = GetSchemaPropertiesForLabel(label);
std::vector<storage::v3::PropertyId> schema_properties;
schema_properties.reserve(schema.size());
@ -408,7 +425,7 @@ class FakeDistributedDbAccessor {
: std::vector<std::pair<query::v2::Expression *, query::v2::plan::FilterInfo>>{};
}
std::vector<memgraph::storage::v3::PropertyId> GetSchemaForLabel(storage::v3::LabelId label) {
std::vector<memgraph::storage::v3::PropertyId> GetSchemaPropertiesForLabel(storage::v3::LabelId label) {
return schemas_.at(label);
}