Merge pull request #831 from memgraph/join-in-optional

- Add id to Frame
This commit is contained in:
Jure Bajic 2023-03-29 15:42:17 +02:00 committed by GitHub
commit 1b11a699d2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 251 additions and 82 deletions

View File

@ -1,4 +1,4 @@
// Copyright 2022 Memgraph Ltd.
// Copyright 2023 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
@ -47,12 +47,12 @@ class ExpressionEvaluator : public ExpressionVisitor<TypedValue> {
TypedValue Visit(NamedExpression &named_expression) override {
const auto &symbol = symbol_table_->at(named_expression);
auto value = named_expression.expression_->Accept(*this);
frame_->at(symbol) = value;
frame_->At(symbol) = value;
return value;
}
TypedValue Visit(Identifier &ident) override {
return TypedValue(frame_->at(symbol_table_->at(ident)), ctx_->memory);
return TypedValue(frame_->At(symbol_table_->at(ident)), ctx_->memory);
}
// NOLINTNEXTLINE(cppcoreguidelines-macro-usage)
@ -470,7 +470,7 @@ class ExpressionEvaluator : public ExpressionVisitor<TypedValue> {
}
TypedValue Visit(Aggregation &aggregation) override {
return TypedValue(frame_->at(symbol_table_->at(aggregation)), ctx_->memory);
return TypedValue(frame_->At(symbol_table_->at(aggregation)), ctx_->memory);
}
TypedValue Visit(Coalesce &coalesce) override {
@ -528,8 +528,8 @@ class ExpressionEvaluator : public ExpressionVisitor<TypedValue> {
const auto &accumulator_symbol = symbol_table_->at(*reduce.accumulator_);
auto accumulator = reduce.initializer_->Accept(*this);
for (const auto &element : list) {
frame_->at(accumulator_symbol) = accumulator;
frame_->at(element_symbol) = element;
frame_->At(accumulator_symbol) = accumulator;
frame_->At(element_symbol) = element;
accumulator = reduce.expression_->Accept(*this);
}
return accumulator;
@ -551,7 +551,7 @@ class ExpressionEvaluator : public ExpressionVisitor<TypedValue> {
if (element.IsNull()) {
result.emplace_back();
} else {
frame_->at(element_symbol) = element;
frame_->At(element_symbol) = element;
result.emplace_back(extract.expression_->Accept(*this));
}
}
@ -571,7 +571,7 @@ class ExpressionEvaluator : public ExpressionVisitor<TypedValue> {
bool has_null_elements = false;
bool has_value = false;
for (const auto &element : list) {
frame_->at(symbol) = element;
frame_->At(symbol) = element;
auto result = all.where_->expression_->Accept(*this);
if (!result.IsNull() && result.type() != TypedValue::Type::Bool) {
throw ExpressionRuntimeException("Predicate of ALL must evaluate to boolean, got {}.", result.type());
@ -608,7 +608,7 @@ class ExpressionEvaluator : public ExpressionVisitor<TypedValue> {
bool has_value = false;
bool predicate_satisfied = false;
for (const auto &element : list) {
frame_->at(symbol) = element;
frame_->At(symbol) = element;
auto result = single.where_->expression_->Accept(*this);
if (!result.IsNull() && result.type() != TypedValue::Type::Bool) {
throw ExpressionRuntimeException("Predicate of SINGLE must evaluate to boolean, got {}.", result.type());
@ -645,7 +645,7 @@ class ExpressionEvaluator : public ExpressionVisitor<TypedValue> {
const auto &symbol = symbol_table_->at(*any.identifier_);
bool has_value = false;
for (const auto &element : list) {
frame_->at(symbol) = element;
frame_->At(symbol) = element;
auto result = any.where_->expression_->Accept(*this);
if (!result.IsNull() && result.type() != TypedValue::Type::Bool) {
throw ExpressionRuntimeException("Predicate of ANY must evaluate to boolean, got {}.", result.type());
@ -677,7 +677,7 @@ class ExpressionEvaluator : public ExpressionVisitor<TypedValue> {
const auto &symbol = symbol_table_->at(*none.identifier_);
bool has_value = false;
for (const auto &element : list) {
frame_->at(symbol) = element;
frame_->At(symbol) = element;
auto result = none.where_->expression_->Accept(*this);
if (!result.IsNull() && result.type() != TypedValue::Type::Bool) {
throw ExpressionRuntimeException("Predicate of NONE must evaluate to boolean, got {}.", result.type());

View File

@ -30,15 +30,18 @@ class Frame {
TypedValue &operator[](const Symbol &symbol) { return elems_[symbol.position()]; }
const TypedValue &operator[](const Symbol &symbol) const { return elems_[symbol.position()]; }
TypedValue &at(const Symbol &symbol) { return elems_.at(symbol.position()); }
const TypedValue &at(const Symbol &symbol) const { return elems_.at(symbol.position()); }
TypedValue &At(const Symbol &symbol) { return elems_.at(symbol.position()); }
const TypedValue &At(const Symbol &symbol) const { return elems_.at(symbol.position()); }
auto &elems() { return elems_; }
const auto &elems() const { return elems_; }
uint64_t Id() const { return id_; }
void SetId(const uint64_t id) { id_ = id; }
const utils::pmr::vector<TypedValue> &Elems() const { return elems_; }
utils::MemoryResource *GetMemoryResource() const { return elems_.get_allocator().GetMemoryResource(); }
private:
uint64_t id_{0U};
utils::pmr::vector<TypedValue> elems_;
};

View File

@ -338,18 +338,31 @@ bool Once::OnceCursor::Pull(Frame &, ExecutionContext &context) {
return false;
}
bool Once::OnceCursor::PullMultiple(MultiFrame &multi_frame, ExecutionContext &context) {
bool Once::OnceCursor::PullMultiple(MultiFrame &output_multi_frame, ExecutionContext &context) {
SCOPED_PROFILE_OP("OnceMF");
if (!did_pull_) {
auto &first_frame = multi_frame.GetFirstFrame();
first_frame.MakeValid();
did_pull_ = true;
if (pushed_down_multi_frame_.has_value()) {
auto pushed_down_consumer = pushed_down_multi_frame_->GetValidFramesConsumer();
auto output_populator = output_multi_frame.GetInvalidFramesPopulator();
auto consumer_it = pushed_down_consumer.begin();
auto populator_it = output_populator.begin();
for (; consumer_it != pushed_down_consumer.end(); ++consumer_it, ++populator_it) {
MG_ASSERT(populator_it != output_populator.end());
*populator_it = std::move(*consumer_it);
}
} else {
auto &first_frame = output_multi_frame.GetFirstFrame();
first_frame.MakeValid();
}
return true;
}
return false;
}
void Once::OnceCursor::PushDown(const MultiFrame &multi_frame) { pushed_down_multi_frame_.emplace(multi_frame); }
UniqueCursorPtr Once::MakeCursor(utils::MemoryResource *mem) const {
EventCounter::IncrementCounter(EventCounter::OnceOperator);
@ -508,7 +521,7 @@ class DistributedScanAllAndFilterCursor : public Cursor {
SCOPED_PROFILE_OP(op_name_);
if (!own_multi_frame_.has_value()) {
own_multi_frame_.emplace(MultiFrame(output_multi_frame.GetFirstFrame().elems().size(),
own_multi_frame_.emplace(MultiFrame(output_multi_frame.GetFirstFrame().Elems().size(),
FLAGS_default_multi_frame_size, output_multi_frame.GetMemoryResource()));
own_frames_consumer_.emplace(own_multi_frame_->GetValidFramesConsumer());
own_frames_it_ = own_frames_consumer_->begin();
@ -573,6 +586,8 @@ class DistributedScanAllAndFilterCursor : public Cursor {
return populated_any;
};
void PushDown(const MultiFrame &multi_frame) override { input_cursor_->PushDown(multi_frame); }
void Shutdown() override { input_cursor_->Shutdown(); }
void ResetExecutionState() {
@ -704,12 +719,12 @@ class DistributedScanByPrimaryKeyCursor : public Cursor {
void EnsureOwnMultiFrameIsGood(MultiFrame &output_multi_frame) {
if (!own_multi_frame_.has_value()) {
own_multi_frame_.emplace(MultiFrame(output_multi_frame.GetFirstFrame().elems().size(),
own_multi_frame_.emplace(MultiFrame(output_multi_frame.GetFirstFrame().Elems().size(),
FLAGS_default_multi_frame_size, output_multi_frame.GetMemoryResource()));
own_frames_consumer_.emplace(own_multi_frame_->GetValidFramesConsumer());
own_frames_it_ = own_frames_consumer_->begin();
}
MG_ASSERT(output_multi_frame.GetFirstFrame().elems().size() == own_multi_frame_->GetFirstFrame().elems().size());
MG_ASSERT(output_multi_frame.GetFirstFrame().Elems().size() == own_multi_frame_->GetFirstFrame().Elems().size());
}
bool PullMultiple(MultiFrame &output_multi_frame, ExecutionContext &context) override {
@ -2104,6 +2119,8 @@ bool Optional::Accept(HierarchicalLogicalOperatorVisitor &visitor) {
return visitor.PostVisit(*this);
}
class OptionalCursor;
UniqueCursorPtr Optional::MakeCursor(utils::MemoryResource *mem) const {
EventCounter::IncrementCounter(EventCounter::OptionalOperator);
@ -2117,29 +2134,31 @@ std::vector<Symbol> Optional::ModifiedSymbols(const SymbolTable &table) const {
return symbols;
}
Optional::OptionalCursor::OptionalCursor(const Optional &self, utils::MemoryResource *mem)
: self_(self), input_cursor_(self.input_->MakeCursor(mem)), optional_cursor_(self.optional_->MakeCursor(mem)) {}
class OptionalCursor : public Cursor {
public:
OptionalCursor(const Optional &self, utils::MemoryResource *mem)
: self_(self), input_cursor_(self.input_->MakeCursor(mem)), optional_cursor_(self.optional_->MakeCursor(mem)) {}
bool Optional::OptionalCursor::Pull(Frame &frame, ExecutionContext &context) {
SCOPED_PROFILE_OP("Optional");
bool Pull(Frame &frame, ExecutionContext &context) override {
SCOPED_PROFILE_OP("Optional");
while (true) {
if (pull_input_) {
if (input_cursor_->Pull(frame, context)) {
// after a successful input from the input
// reset optional_ (it's expand iterators maintain state)
optional_cursor_->Reset();
} else
// input is exhausted, we're done
return false;
}
while (true) {
if (pull_input_) {
if (input_cursor_->Pull(frame, context)) {
// after a successful pull from the input
// reset optional_ (it's expand iterators maintain state)
optional_cursor_->Reset();
} else
// input is exhausted, we're done
return false;
}
// pull from the optional_ cursor
if (optional_cursor_->Pull(frame, context)) {
// if successful, next Pull from this should not pull_input_
pull_input_ = false;
return true;
} else {
// pull from the optional_ cursor
if (optional_cursor_->Pull(frame, context)) {
// if successful, next Pull from this should not pull_input_
pull_input_ = false;
return true;
}
// failed to Pull from the merge_match cursor
if (pull_input_) {
// if we have just now pulled from the input
@ -2153,21 +2172,180 @@ bool Optional::OptionalCursor::Pull(Frame &frame, ExecutionContext &context) {
// we have exhausted optional_cursor_ after 1 or more successful Pulls
// attempt next input_cursor_ pull
pull_input_ = true;
continue;
}
}
}
void Optional::OptionalCursor::Shutdown() {
input_cursor_->Shutdown();
optional_cursor_->Shutdown();
}
bool HandleReadyInput(InvalidFramesPopulator &output_populator, InvalidFramesPopulator::Iterator &output_frames_it,
ExecutionContext &context) {
bool populated_any = false;
while (true) {
switch (optional_state_) {
case State::Pull: {
if (!optional_cursor_->PullMultiple(*optional_multi_frame_, context)) {
optional_state_ = State::Exhausted;
optional_frames_consumer_.reset();
optional_frames_it_ = {};
if (populated_any) {
++own_frames_it_;
}
} else {
optional_frames_consumer_ = optional_multi_frame_->GetValidFramesConsumer();
optional_frames_it_ = optional_frames_consumer_->begin();
optional_state_ = State::Ready;
}
break;
}
case State::Ready: {
while (optional_frames_it_ != optional_frames_consumer_->end()) {
if (output_frames_it == output_populator.end()) {
return populated_any;
}
populated_any = true;
if (optional_frames_it_->Id() == own_frames_it_->Id()) {
// This might be a move, but then in we have to have special logic is EnsureOwnMultiFramesAreGood
*output_frames_it = *optional_frames_it_;
last_matched_frame_ = optional_frames_it_->Id();
optional_frames_it_->MakeInvalid();
++optional_frames_it_;
++output_frames_it;
if (optional_frames_it_ == optional_frames_consumer_->end()) {
optional_state_ = State::Pull;
}
} else if (last_matched_frame_ == own_frames_it_->Id()) {
++own_frames_it_;
} else {
// TODO(antaljanosbenjamin): Remove (or improve the message of) this assert
MG_ASSERT(optional_frames_it_->Id() > own_frames_it_->Id(), "This should be the case DELETE ME");
for (const auto &symbol : self_.optional_symbols_) {
spdlog::error("{}", symbol.name());
(*own_frames_it_)[symbol] = TypedValue(context.evaluation_context.memory);
}
// This might be a move, but then in we have to have special logic is EnsureOwnMultiFramesAreGood
*output_frames_it = *own_frames_it_;
last_matched_frame_ = own_frames_it_->Id();
own_frames_it_->MakeInvalid();
++own_frames_it_;
}
}
break;
}
case State::Exhausted: {
while (own_frames_it_ != own_frames_consumer_->end() && output_frames_it != output_populator.end()) {
MG_ASSERT(!optional_frames_consumer_.has_value(), "This should be the case DELETE ME");
for (const auto &symbol : self_.optional_symbols_) {
spdlog::error("{}", symbol.name());
(*own_frames_it_)[symbol] = TypedValue(context.evaluation_context.memory);
}
// This might be a move, but then in we have to have special logic is EnsureOwnMultiFramesAreGood
*output_frames_it = *own_frames_it_;
++own_frames_it_;
void Optional::OptionalCursor::Reset() {
input_cursor_->Reset();
optional_cursor_->Reset();
pull_input_ = true;
}
populated_any = true;
own_frames_it_->MakeInvalid();
++output_frames_it;
}
return populated_any;
}
}
}
return populated_any;
}
bool PullMultiple(MultiFrame &output_multi_frame, ExecutionContext &context) override {
SCOPED_PROFILE_OP("OptionalMF");
EnsureOwnMultiFramesAreGood(output_multi_frame);
auto populated_any{false};
auto output_frames_populator = output_multi_frame.GetInvalidFramesPopulator();
auto output_frames_it = output_frames_populator.begin();
while (true) {
switch (input_state_) {
case State::Pull: {
MG_ASSERT(optional_state_ != State::Ready, "Unexpected state");
if (!input_cursor_->PullMultiple(*own_multi_frame_, context)) {
input_state_ = State::Exhausted;
optional_state_ = State::Exhausted;
} else {
input_state_ = State::Ready;
optional_state_ = State::Pull;
uint64_t frame_id{0U};
for (auto &frame : own_multi_frame_->GetValidFramesModifier()) {
frame.SetId(frame_id++);
}
last_matched_frame_ = 0U;
optional_cursor_->Reset();
optional_cursor_->PushDown(*own_multi_frame_);
own_frames_consumer_ = own_multi_frame_->GetValidFramesConsumer();
own_frames_it_ = own_frames_consumer_->begin();
}
break;
}
case State::Ready: {
populated_any |= HandleReadyInput(output_frames_populator, output_frames_it, context);
if (output_frames_it == output_frames_populator.end()) {
return populated_any;
}
if (own_frames_it_ == own_frames_consumer_->end()) {
input_state_ = State::Pull;
}
break;
}
case State::Exhausted: {
MG_ASSERT(optional_state_ == State::Exhausted);
return populated_any;
}
}
}
}
void Shutdown() override {
input_cursor_->Shutdown();
optional_cursor_->Shutdown();
}
void Reset() override {
// TODO(antaljanosbenjamin)
input_cursor_->Reset();
optional_cursor_->Reset();
pull_input_ = true;
}
private:
enum class State { Pull, Ready, Exhausted };
void EnsureOwnMultiFramesAreGood(MultiFrame &output_multi_frame) {
if (!own_multi_frame_.has_value()) {
own_multi_frame_.emplace(MultiFrame(output_multi_frame.GetFirstFrame().Elems().size(),
FLAGS_default_multi_frame_size, output_multi_frame.GetMemoryResource()));
own_frames_consumer_.emplace(own_multi_frame_->GetValidFramesConsumer());
own_frames_it_ = own_frames_consumer_->begin();
optional_multi_frame_.emplace(MultiFrame(output_multi_frame.GetFirstFrame().Elems().size(),
FLAGS_default_multi_frame_size, output_multi_frame.GetMemoryResource()));
}
MG_ASSERT(output_multi_frame.GetFirstFrame().Elems().size() == own_multi_frame_->GetFirstFrame().Elems().size());
}
const Optional &self_;
const UniqueCursorPtr input_cursor_;
const UniqueCursorPtr optional_cursor_;
State input_state_{State::Pull};
State optional_state_{State::Pull};
std::optional<MultiFrame> own_multi_frame_;
std::optional<ValidFramesConsumer> own_frames_consumer_;
ValidFramesConsumer::Iterator own_frames_it_;
std::optional<MultiFrame> optional_multi_frame_;
std::optional<ValidFramesConsumer> optional_frames_consumer_;
ValidFramesConsumer::Iterator optional_frames_it_;
uint64_t last_matched_frame_{0U};
// indicates if the next Pull from this cursor should
// perform a Pull from the input_cursor_
// this is true when:
// - first pulling from this Cursor
// - previous Pull from this cursor exhausted the optional_cursor_
bool pull_input_{true};
};
Unwind::Unwind(const std::shared_ptr<LogicalOperator> &input, Expression *input_expression, Symbol output_symbol)
: input_(input ? input : std::make_shared<Once>()),
@ -2212,7 +2390,7 @@ class UnwindCursor : public Cursor {
SCOPED_PROFILE_OP("UnwindMF");
if (!own_multi_frame_.has_value()) {
own_multi_frame_.emplace(MultiFrame(output_multi_frame.GetFirstFrame().elems().size(),
own_multi_frame_.emplace(MultiFrame(output_multi_frame.GetFirstFrame().Elems().size(),
FLAGS_default_multi_frame_size, output_multi_frame.GetMemoryResource()));
own_frames_consumer_.emplace(own_multi_frame_->GetValidFramesConsumer());
own_frames_it_ = own_frames_consumer_->begin();
@ -2477,7 +2655,7 @@ class CartesianCursor : public Cursor {
if (!cartesian_pull_initialized_) {
// Pull all left_op frames.
while (left_op_cursor_->Pull(frame, context)) {
left_op_frames_.emplace_back(frame.elems().begin(), frame.elems().end());
left_op_frames_.emplace_back(frame.Elems().begin(), frame.Elems().end());
}
// We're setting the iterator to 'end' here so it pulls the right
@ -2501,7 +2679,7 @@ class CartesianCursor : public Cursor {
// Advance right_op_cursor_.
if (!right_op_cursor_->Pull(frame, context)) return false;
right_op_frame_.assign(frame.elems().begin(), frame.elems().end());
right_op_frame_.assign(frame.Elems().begin(), frame.Elems().end());
left_op_frames_it_ = left_op_frames_.begin();
} else {
// Make sure right_op_cursor last pulled results are on frame.
@ -3215,7 +3393,8 @@ class DistributedExpandCursor : public Cursor {
void InitEdgesMultiple() {
// This function won't work if any vertex id is duplicated in the input, because:
// 1. vertex_id_to_result_row is not a multimap
// 2. if self_.common_.existing_node is true, then we erase edges that might be necessary for the input vertex on a
// 2. if self_.common_.existing_node is true, then we erase edges that might be necessary for the input
// vertex on a
// later frame
const auto &frame = (*own_frames_it_);
const auto &vertex_value = frame[self_.input_symbol_];
@ -3375,14 +3554,16 @@ class DistributedExpandCursor : public Cursor {
return populated_any;
}
void PushDown(const MultiFrame &multi_frame) override { input_cursor_->PushDown(multi_frame); }
void EnsureOwnMultiFrameIsGood(MultiFrame &output_multi_frame) {
if (!own_multi_frame_.has_value()) {
own_multi_frame_.emplace(MultiFrame(output_multi_frame.GetFirstFrame().elems().size(),
own_multi_frame_.emplace(MultiFrame(output_multi_frame.GetFirstFrame().Elems().size(),
FLAGS_default_multi_frame_size, output_multi_frame.GetMemoryResource()));
own_frames_consumer_.emplace(own_multi_frame_->GetValidFramesConsumer());
own_frames_it_ = own_frames_consumer_->begin();
}
MG_ASSERT(output_multi_frame.GetFirstFrame().elems().size() == own_multi_frame_->GetFirstFrame().elems().size());
MG_ASSERT(output_multi_frame.GetFirstFrame().Elems().size() == own_multi_frame_->GetFirstFrame().Elems().size());
}
void Shutdown() override { input_cursor_->Shutdown(); }
@ -3393,7 +3574,9 @@ class DistributedExpandCursor : public Cursor {
result_rows_.clear();
own_frames_it_ = ValidFramesConsumer::Iterator{};
own_frames_consumer_.reset();
own_multi_frame_->MakeAllFramesInvalid();
if (own_multi_frame_.has_value()) {
own_multi_frame_->MakeAllFramesInvalid();
}
state_ = State::PullInputAndEdges;
current_in_edges_.clear();

View File

@ -88,6 +88,8 @@ class Cursor {
/// @throws QueryRuntimeException if something went wrong with execution
virtual bool PullMultiple(MultiFrame &, ExecutionContext &) {MG_ASSERT(false, "PullMultipleIsNotImplemented"); return false; }
virtual void PushDown(const MultiFrame&) { MG_ASSERT(false, "PushDownIsNotImplemented"); }
/// Resets the Cursor to its initial state.
virtual void Reset() = 0;
@ -350,11 +352,13 @@ and false on every following Pull.")
public:
OnceCursor() {}
bool PullMultiple(MultiFrame &, ExecutionContext &) override;
void PushDown(const MultiFrame&) override;
bool Pull(Frame &, ExecutionContext &) override;
void Shutdown() override;
void Reset() override;
private:
std::optional<MultiFrame> pushed_down_multi_frame_;
bool did_pull_{false};
};
cpp<#)
@ -1967,27 +1971,6 @@ and returns true, once.")
input_ = input;
}
cpp<#)
(:private
#>cpp
class OptionalCursor : public Cursor {
public:
OptionalCursor(const Optional &, utils::MemoryResource *);
bool Pull(Frame &, ExecutionContext &) override;
void Shutdown() override;
void Reset() override;
private:
const Optional &self_;
const UniqueCursorPtr input_cursor_;
const UniqueCursorPtr optional_cursor_;
// indicates if the next Pull from this cursor should
// perform a Pull from the input_cursor_
// this is true when:
// - first pulling from this Cursor
// - previous Pull from this cursor exhausted the optional_cursor_
bool pull_input_{true};
};
cpp<#)
(:serialize (:slk))
(:clone))

View File

@ -31,8 +31,8 @@ MultiFrame CreateMultiFrame(const size_t max_pos, const Symbol &src, const Symbo
auto frames_populator = multi_frame.GetInvalidFramesPopulator();
size_t i = 0;
for (auto &frame : frames_populator) {
auto &src_acc = frame.at(src);
auto &dst_acc = frame.at(dst);
auto &src_acc = frame.At(src);
auto &dst_acc = frame.At(dst);
auto v1 = msgs::Vertex{.id = {{msgs::LabelId::FromUint(1)}, {msgs::Value(static_cast<int64_t>(i++))}}};
auto v2 = msgs::Vertex{.id = {{msgs::LabelId::FromUint(1)}, {msgs::Value(static_cast<int64_t>(i++))}}};
std::map<msgs::PropertyId, msgs::Value> mp;