Move OrderByCursor from LCP to CPP

Reviewers: mtomic, llugovic

Reviewed By: mtomic

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D2060
This commit is contained in:
Teon Banek 2019-05-16 14:35:41 +02:00
parent 0075eee58b
commit abbb57c868
2 changed files with 68 additions and 78 deletions

View File

@ -2852,11 +2852,6 @@ OrderBy::OrderBy(const std::shared_ptr<LogicalOperator> &input,
ACCEPT_WITH_INPUT(OrderBy)
UniqueCursorPtr OrderBy::MakeCursor(database::GraphDbAccessor *db,
utils::MemoryResource *mem) const {
return MakeUniqueCursorPtr<OrderByCursor>(mem, *this, db, mem);
}
std::vector<Symbol> OrderBy::OutputSymbols(
const SymbolTable &symbol_table) const {
// Propagate this to potential Produce.
@ -2867,67 +2862,86 @@ std::vector<Symbol> OrderBy::ModifiedSymbols(const SymbolTable &table) const {
return input_->ModifiedSymbols(table);
}
OrderBy::OrderByCursor::OrderByCursor(const OrderBy &self,
database::GraphDbAccessor *db,
utils::MemoryResource *mem)
: self_(self), input_cursor_(self_.input_->MakeCursor(db, mem)) {}
class OrderByCursor : public Cursor {
public:
OrderByCursor(const OrderBy &self, database::GraphDbAccessor *db,
utils::MemoryResource *mem)
: self_(self), input_cursor_(self_.input_->MakeCursor(db, mem)) {}
bool OrderBy::OrderByCursor::Pull(Frame &frame, ExecutionContext &context) {
SCOPED_PROFILE_OP("OrderBy");
bool Pull(Frame &frame, ExecutionContext &context) override {
SCOPED_PROFILE_OP("OrderBy");
if (!did_pull_all_) {
ExpressionEvaluator evaluator(&frame, context.symbol_table,
context.evaluation_context,
context.db_accessor, GraphView::OLD);
while (input_cursor_->Pull(frame, context)) {
// collect the order_by elements
std::vector<TypedValue> order_by;
order_by.reserve(self_.order_by_.size());
for (auto expression_ptr : self_.order_by_) {
order_by.emplace_back(expression_ptr->Accept(evaluator));
if (!did_pull_all_) {
ExpressionEvaluator evaluator(&frame, context.symbol_table,
context.evaluation_context,
context.db_accessor, GraphView::OLD);
while (input_cursor_->Pull(frame, context)) {
// collect the order_by elements
std::vector<TypedValue> order_by;
order_by.reserve(self_.order_by_.size());
for (auto expression_ptr : self_.order_by_) {
order_by.emplace_back(expression_ptr->Accept(evaluator));
}
// collect the output elements
std::vector<TypedValue> output;
output.reserve(self_.output_symbols_.size());
for (const Symbol &output_sym : self_.output_symbols_)
output.emplace_back(frame[output_sym]);
cache_.emplace_back(std::move(order_by), std::move(output));
}
// collect the output elements
std::vector<TypedValue> output;
output.reserve(self_.output_symbols_.size());
for (const Symbol &output_sym : self_.output_symbols_)
output.emplace_back(frame[output_sym]);
std::sort(cache_.begin(), cache_.end(),
[this](const auto &pair1, const auto &pair2) {
return self_.compare_(pair1.first, pair2.first);
});
cache_.emplace_back(std::move(order_by), std::move(output));
did_pull_all_ = true;
cache_it_ = cache_.begin();
}
std::sort(cache_.begin(), cache_.end(),
[this](const auto &pair1, const auto &pair2) {
return self_.compare_(pair1.first, pair2.first);
});
if (cache_it_ == cache_.end()) return false;
did_pull_all_ = true;
if (context.db_accessor->should_abort()) throw HintedAbortError();
// place the output values on the frame
DCHECK(self_.output_symbols_.size() == cache_it_->second.size())
<< "Number of values does not match the number of output symbols "
"in OrderBy";
auto output_sym_it = self_.output_symbols_.begin();
for (const TypedValue &output : cache_it_->second)
frame[*output_sym_it++] = output;
cache_it_++;
return true;
}
void Shutdown() override { input_cursor_->Shutdown(); }
void Reset() override {
input_cursor_->Reset();
did_pull_all_ = false;
cache_.clear();
cache_it_ = cache_.begin();
}
if (cache_it_ == cache_.end()) return false;
private:
const OrderBy &self_;
const UniqueCursorPtr input_cursor_;
bool did_pull_all_{false};
// a cache of elements pulled from the input
// first pair element is the order-by vector
// second pair is the remember vector
// the cache is filled and sorted (only on first pair elem) on first Pull
std::vector<std::pair<std::vector<TypedValue>, std::vector<TypedValue>>>
cache_;
// iterator over the cache_, maintains state between Pulls
decltype(cache_.begin()) cache_it_ = cache_.begin();
};
if (context.db_accessor->should_abort()) throw HintedAbortError();
// place the output values on the frame
DCHECK(self_.output_symbols_.size() == cache_it_->second.size())
<< "Number of values does not match the number of output symbols "
"in OrderBy";
auto output_sym_it = self_.output_symbols_.begin();
for (const TypedValue &output : cache_it_->second)
frame[*output_sym_it++] = output;
cache_it_++;
return true;
}
void OrderBy::OrderByCursor::Shutdown() { input_cursor_->Shutdown(); }
void OrderBy::OrderByCursor::Reset() {
input_cursor_->Reset();
did_pull_all_ = false;
cache_.clear();
cache_it_ = cache_.begin();
UniqueCursorPtr OrderBy::MakeCursor(database::GraphDbAccessor *db,
utils::MemoryResource *mem) const {
return MakeUniqueCursorPtr<OrderByCursor>(mem, *this, db, mem);
}
Merge::Merge(const std::shared_ptr<LogicalOperator> &input,

View File

@ -1789,30 +1789,6 @@ are valid for usage after the OrderBy operator.")
input_ = input;
}
cpp<#)
(:private
#>cpp
class OrderByCursor : public Cursor {
public:
OrderByCursor(const OrderBy &, database::GraphDbAccessor *,
utils::MemoryResource *);
bool Pull(Frame &, ExecutionContext &) override;
void Shutdown() override;
void Reset() override;
private:
const OrderBy &self_;
const UniqueCursorPtr input_cursor_;
bool did_pull_all_{false};
// a cache of elements pulled from the input
// first pair element is the order-by vector
// second pair is the remember vector
// the cache is filled and sorted (only on first pair elem) on first Pull
std::vector<std::pair<std::vector<TypedValue>, std::vector<TypedValue>>>
cache_;
// iterator over the cache_, maintains state between Pulls
decltype(cache_.begin()) cache_it_ = cache_.begin();
};
cpp<#)
(:serialize (:slk))
(:clone))