Use utils::MonotonicBufferResource in query execution
Reviewers: mferencevic, mtomic, msantl Reviewed By: mferencevic Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D2016
This commit is contained in:
parent
c4725bcf99
commit
2909ef63d2
@ -18,7 +18,9 @@ ProduceRpcServer::OngoingProduce::OngoingProduce(
|
||||
context_{dba_.get()},
|
||||
pull_symbols_(std::move(pull_symbols)),
|
||||
frame_(plan_pack.symbol_table.max_position()),
|
||||
cursor_(plan_pack.plan->MakeCursor(*dba_)) {
|
||||
execution_memory_(std::make_unique<utils::MonotonicBufferResource>(
|
||||
query::kExecutionMemoryBlockSize)),
|
||||
cursor_(plan_pack.plan->MakeCursor(dba_.get(), execution_memory_.get())) {
|
||||
context_.symbol_table = plan_pack.symbol_table;
|
||||
context_.evaluation_context.timestamp = timestamp;
|
||||
context_.evaluation_context.parameters = parameters;
|
||||
|
@ -65,6 +65,9 @@ class ProduceRpcServer {
|
||||
query::Frame frame_;
|
||||
PullState cursor_state_{PullState::CURSOR_IN_PROGRESS};
|
||||
std::vector<std::vector<query::TypedValue>> accumulation_;
|
||||
// execution_memory_ is unique_ptr because we are passing the address to
|
||||
// cursor_, and we want to preserve the pointer in case we get moved.
|
||||
std::unique_ptr<utils::MonotonicBufferResource> execution_memory_;
|
||||
std::unique_ptr<query::plan::Cursor> cursor_;
|
||||
|
||||
/// Pulls and returns a single result from the cursor.
|
||||
|
@ -7,6 +7,8 @@
|
||||
|
||||
namespace query {
|
||||
|
||||
static constexpr size_t kExecutionMemoryBlockSize = 1U * 1024U * 1024U;
|
||||
|
||||
struct EvaluationContext {
|
||||
int64_t timestamp{-1};
|
||||
Parameters parameters;
|
||||
|
@ -871,8 +871,9 @@ Interpreter::Results Interpreter::operator()(
|
||||
auto output_plan = std::make_unique<plan::OutputTable>(
|
||||
output_symbols,
|
||||
[cypher_query_plan](Frame *frame, ExecutionContext *context) {
|
||||
auto cursor =
|
||||
cypher_query_plan->plan().MakeCursor(*context->db_accessor);
|
||||
utils::MonotonicBufferResource execution_memory(1 * 1024 * 1024);
|
||||
auto cursor = cypher_query_plan->plan().MakeCursor(
|
||||
context->db_accessor, &execution_memory);
|
||||
|
||||
// We are pulling from another plan, so set up the EvaluationContext
|
||||
// correctly. The rest of the context should be good for sharing.
|
||||
|
@ -93,7 +93,10 @@ class Interpreter {
|
||||
bool is_profile_query = false, bool should_abort_query = false)
|
||||
: ctx_{db_accessor},
|
||||
plan_(plan),
|
||||
cursor_(plan_->plan().MakeCursor(*db_accessor)),
|
||||
execution_memory_(std::make_unique<utils::MonotonicBufferResource>(
|
||||
kExecutionMemoryBlockSize)),
|
||||
cursor_(
|
||||
plan_->plan().MakeCursor(db_accessor, execution_memory_.get())),
|
||||
frame_(plan_->symbol_table().max_position()),
|
||||
output_symbols_(output_symbols),
|
||||
header_(header),
|
||||
@ -176,6 +179,9 @@ class Interpreter {
|
||||
private:
|
||||
ExecutionContext ctx_;
|
||||
std::shared_ptr<CachedPlan> plan_;
|
||||
// execution_memory_ is unique_ptr because we are passing the address to
|
||||
// cursor_, and we want to preserve the pointer in case we get moved.
|
||||
std::unique_ptr<utils::MonotonicBufferResource> execution_memory_;
|
||||
std::unique_ptr<query::plan::Cursor> cursor_;
|
||||
Frame frame_;
|
||||
std::vector<Symbol> output_symbols_;
|
||||
|
@ -145,10 +145,6 @@ can serve as inputs to others and thus a sequence of operations is formed.")
|
||||
virtual std::unique_ptr<Cursor> MakeCursor(
|
||||
database::GraphDbAccessor *, utils::MemoryResource *) const = 0;
|
||||
|
||||
std::unique_ptr<Cursor> MakeCursor(database::GraphDbAccessor &dba) const {
|
||||
return MakeCursor(&dba, utils::NewDeleteResource());
|
||||
}
|
||||
|
||||
/** Return @c Symbol vector where the query results will be stored.
|
||||
*
|
||||
* Currently, output symbols are generated in @c Produce and @c Union
|
||||
|
@ -52,7 +52,8 @@ static void DistinctDefaultAllocator(benchmark::State &state) {
|
||||
while (state.KeepRunning()) {
|
||||
query::ExecutionContext execution_context{&dba, symbol_table,
|
||||
evaluation_context};
|
||||
auto cursor = plan_and_cost.first->MakeCursor(dba);
|
||||
auto cursor =
|
||||
plan_and_cost.first->MakeCursor(&dba, utils::NewDeleteResource());
|
||||
while (cursor->Pull(frame, execution_context))
|
||||
;
|
||||
}
|
||||
|
@ -173,7 +173,8 @@ class Yield : public query::plan::LogicalOperator {
|
||||
std::vector<std::vector<query::TypedValue>> PullResults(
|
||||
query::plan::LogicalOperator *last_op, query::ExecutionContext *context,
|
||||
std::vector<query::Symbol> output_symbols) {
|
||||
auto cursor = last_op->MakeCursor(*context->db_accessor);
|
||||
auto cursor =
|
||||
last_op->MakeCursor(context->db_accessor, utils::NewDeleteResource());
|
||||
std::vector<std::vector<query::TypedValue>> output;
|
||||
{
|
||||
query::Frame frame(context->symbol_table.max_position());
|
||||
|
@ -21,7 +21,7 @@ TEST_F(DistributedReset, ResetTest) {
|
||||
query::Frame frame(0);
|
||||
query::ExecutionContext context{dba.get()};
|
||||
auto pull_remote_cursor =
|
||||
pull_remote->query::plan::LogicalOperator::MakeCursor(*dba);
|
||||
pull_remote->MakeCursor(dba.get(), utils::NewDeleteResource());
|
||||
|
||||
for (int i = 0; i < 3; ++i) {
|
||||
EXPECT_TRUE(pull_remote_cursor->Pull(frame, context));
|
||||
|
@ -43,7 +43,7 @@ std::vector<std::vector<TypedValue>> CollectProduce(const Produce &produce,
|
||||
|
||||
// stream out results
|
||||
auto cursor =
|
||||
produce.query::plan::LogicalOperator::MakeCursor(*context->db_accessor);
|
||||
produce.MakeCursor(context->db_accessor, utils::NewDeleteResource());
|
||||
std::vector<std::vector<TypedValue>> results;
|
||||
while (cursor->Pull(frame, *context)) {
|
||||
std::vector<TypedValue> values;
|
||||
@ -56,7 +56,8 @@ std::vector<std::vector<TypedValue>> CollectProduce(const Produce &produce,
|
||||
|
||||
int PullAll(const LogicalOperator &logical_op, ExecutionContext *context) {
|
||||
Frame frame(context->symbol_table.max_position());
|
||||
auto cursor = logical_op.MakeCursor(*context->db_accessor);
|
||||
auto cursor =
|
||||
logical_op.MakeCursor(context->db_accessor, utils::NewDeleteResource());
|
||||
int count = 0;
|
||||
while (cursor->Pull(frame, *context)) count++;
|
||||
return count;
|
||||
|
@ -280,8 +280,8 @@ TEST(QueryPlan, Delete) {
|
||||
n.op_, std::vector<Expression *>{n_get}, true);
|
||||
Frame frame(symbol_table.max_position());
|
||||
auto context = MakeContext(storage, symbol_table, &dba);
|
||||
delete_op->query::plan::LogicalOperator::MakeCursor(dba)->Pull(frame,
|
||||
context);
|
||||
delete_op->MakeCursor(&dba, utils::NewDeleteResource())
|
||||
->Pull(frame, context);
|
||||
dba.AdvanceCommand();
|
||||
EXPECT_EQ(3, CountIterable(dba.Vertices(false)));
|
||||
EXPECT_EQ(3, CountIterable(dba.Edges(false)));
|
||||
|
@ -586,7 +586,7 @@ class QueryPlanExpandVariable : public testing::Test {
|
||||
template <typename TResult>
|
||||
auto GetResults(std::shared_ptr<LogicalOperator> input_op, Symbol symbol) {
|
||||
Frame frame(symbol_table.max_position());
|
||||
auto cursor = input_op->MakeCursor(dba_);
|
||||
auto cursor = input_op->MakeCursor(&dba_, utils::NewDeleteResource());
|
||||
auto context = MakeContext(storage, symbol_table, &dba_);
|
||||
std::vector<TResult> results;
|
||||
while (cursor->Pull(frame, context))
|
||||
@ -884,7 +884,7 @@ class QueryPlanExpandWeightedShortestPath : public testing::Test {
|
||||
total_weight);
|
||||
|
||||
Frame frame(symbol_table.max_position());
|
||||
auto cursor = last_op->MakeCursor(dba);
|
||||
auto cursor = last_op->MakeCursor(&dba, utils::NewDeleteResource());
|
||||
std::vector<ResultType> results;
|
||||
auto context = MakeContext(storage, symbol_table, &dba);
|
||||
while (cursor->Pull(frame, context)) {
|
||||
|
Loading…
Reference in New Issue
Block a user