From 436e41f71ff5f5bb5ce6ca635d1251500641b466 Mon Sep 17 00:00:00 2001 From: gvolfing Date: Mon, 30 Jan 2023 13:06:05 +0100 Subject: [PATCH 1/6] Init POC of ScanByPrimaryKey multiframe --- src/query/v2/interpreter.cpp | 3 +- src/query/v2/plan/operator.cpp | 157 ++++++++++++++++++++++++++++++++- 2 files changed, 157 insertions(+), 3 deletions(-) diff --git a/src/query/v2/interpreter.cpp b/src/query/v2/interpreter.cpp index aa220d764..fe7a17bf7 100644 --- a/src/query/v2/interpreter.cpp +++ b/src/query/v2/interpreter.cpp @@ -812,7 +812,8 @@ std::optional PullPlan::PullMultiple(AnyStrea std::optional PullPlan::Pull(AnyStream *stream, std::optional n, const std::vector &output_symbols, std::map *summary) { - auto should_pull_multiple = false; // TODO on the long term, we will only use PullMultiple + // auto should_pull_multiple = false; // TODO on the long term, we will only use PullMultiple + auto should_pull_multiple = true; if (should_pull_multiple) { return PullMultiple(stream, n, output_symbols, summary); } diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp index 50e5d4fca..c9bd2e9d0 100644 --- a/src/query/v2/plan/operator.cpp +++ b/src/query/v2/plan/operator.cpp @@ -603,8 +603,6 @@ class DistributedScanByPrimaryKeyCursor : public Cursor { filter_expressions_(filter_expressions), primary_key_(primary_key) {} - enum class State : int8_t { INITIALIZING, COMPLETED }; - using VertexAccessor = accessors::VertexAccessor; std::optional MakeRequestSingleFrame(Frame &frame, RequestRouterInterface &request_router, @@ -637,6 +635,83 @@ class DistributedScanByPrimaryKeyCursor : public Cursor { return VertexAccessor(vertex, properties, &request_router); } + // TODO (gvolfing) optinal vs empty vector for signaling failure? + bool MakeRequestSingleFrameTwo(Frame &frame, RequestRouterInterface &request_router, ExecutionContext &context) { + // Evaluate the expressions that hold the PrimaryKey. + ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router, + storage::v3::View::NEW); + + std::vector pk; + for (auto *primary_property : primary_key_) { + pk.push_back(TypedValueToValue(primary_property->Accept(evaluator))); + } + + msgs::Label label = {.id = msgs::LabelId::FromUint(label_.AsUint())}; + + msgs::GetPropertiesRequest req = {.vertex_ids = {std::make_pair(label, pk)}}; + auto get_prop_result = std::invoke([&context, &request_router, &req]() mutable { + SCOPED_REQUEST_WAIT_PROFILE; + return request_router.GetProperties(req); + }); + MG_ASSERT(get_prop_result.size() <= 1); + + // { + // SCOPED_REQUEST_WAIT_PROFILE; + // std::optional request_label = std::nullopt; + // if (label_.has_value()) { + // request_label = context.request_router->LabelToName(*label_); + // } + // current_batch_ = context.request_router->ScanVertices(request_label); + // } + // current_vertex_it_ = current_batch_.begin(); + // return !current_batch_.empty( + + if (get_prop_result.empty()) { + // return std::nullopt; + return false; + } + + auto properties = get_prop_result[0].props; + // TODO (gvolfing) figure out labels when relevant. + msgs::Vertex vertex = {.id = get_prop_result[0].vertex, .labels = {}}; + + current_batch_ = {VertexAccessor(vertex, properties, &request_router)}; + current_vertex_it_ = current_batch_.begin(); + return current_batch_.empty(); + } + + // std::vector MakeRequest(Frame &frame, RequestRouterInterface &request_router, + // ExecutionContext &context) { + // std::vector ret; + // // Evaluate the expressions that hold the PrimaryKey. + // ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router, + // storage::v3::View::NEW); + + // std::vector pk; + // for (auto *primary_property : primary_key_) { + // pk.push_back(TypedValueToValue(primary_property->Accept(evaluator))); + // } + + // msgs::Label label = {.id = msgs::LabelId::FromUint(label_.AsUint())}; + + // msgs::GetPropertiesRequest req = {.vertex_ids = {std::make_pair(label, pk)}}; + // auto get_prop_result = std::invoke([&context, &request_router, &req]() mutable { + // SCOPED_REQUEST_WAIT_PROFILE; + // return request_router.GetProperties(req); + // }); + // MG_ASSERT(get_prop_result.size() <= 1); + + // if (get_prop_result.empty()) { + // return ret; + // } + // auto properties = get_prop_result[0].props; + // // TODO (gvolfing) figure out labels when relevant. + // msgs::Vertex vertex = {.id = get_prop_result[0].vertex, .labels = {}}; + // auto va = VertexAccessor(vertex, properties, &request_router); + // ret.push_back(va); + // return ret; + // } + bool Pull(Frame &frame, ExecutionContext &context) override { SCOPED_PROFILE_OP(op_name_); @@ -655,17 +730,95 @@ class DistributedScanByPrimaryKeyCursor : public Cursor { return false; } + bool PullMultiple(MultiFrame &output_multi_frame, ExecutionContext &context) override { + SCOPED_PROFILE_OP(op_name_); + + if (!own_multi_frame_.has_value()) { + own_multi_frame_.emplace(MultiFrame(output_multi_frame.GetFirstFrame().elems().size(), + kNumberOfFramesInMultiframe, output_multi_frame.GetMemoryResource())); + own_frames_consumer_.emplace(own_multi_frame_->GetValidFramesConsumer()); + own_frames_it_ = own_frames_consumer_->begin(); + } + + auto output_frames_populator = output_multi_frame.GetInvalidFramesPopulator(); + auto populated_any = false; + + while (true) { + switch (state_) { + case State::PullInput: { + if (!input_cursor_->PullMultiple(*own_multi_frame_, context)) { + state_ = State::Exhausted; + return populated_any; + } + own_frames_consumer_.emplace(own_multi_frame_->GetValidFramesConsumer()); + own_frames_it_ = own_frames_consumer_->begin(); + state_ = State::FetchVertices; + break; + } + case State::FetchVertices: { + if (own_frames_it_ == own_frames_consumer_->end()) { + state_ = State::PullInput; + continue; + } + if (!filter_expressions_->empty() || current_batch_.empty()) { + // MakeRequest(context); + MakeRequestSingleFrameTwo(*own_frames_it_, *context.request_router, context); + } else { + // We can reuse the vertices as they don't depend on any value from the frames + current_vertex_it_ = current_batch_.begin(); + } + state_ = State::PopulateOutput; + break; + } + case State::PopulateOutput: { + if (!output_multi_frame.HasInvalidFrame()) { + return populated_any; + } + if (current_vertex_it_ == current_batch_.end()) { + own_frames_it_->MakeInvalid(); + ++own_frames_it_; + state_ = State::FetchVertices; + continue; + } + + for (auto output_frame_it = output_frames_populator.begin(); + output_frame_it != output_frames_populator.end() && current_vertex_it_ != current_batch_.end(); + ++output_frame_it) { + auto &output_frame = *output_frame_it; + output_frame = *own_frames_it_; + output_frame[output_symbol_] = TypedValue(*current_vertex_it_); + current_vertex_it_++; + populated_any = true; + } + break; + } + case State::Exhausted: { + return populated_any; + } + } + } + return populated_any; + }; + void Reset() override { input_cursor_->Reset(); } void Shutdown() override { input_cursor_->Shutdown(); } private: + enum class State { PullInput, FetchVertices, PopulateOutput, Exhausted }; + + State state_{State::PullInput}; const Symbol output_symbol_; const UniqueCursorPtr input_cursor_; const char *op_name_; + std::vector current_batch_; + std::vector::iterator current_vertex_it_{current_batch_.begin()}; storage::v3::LabelId label_; std::optional> filter_expressions_; std::vector primary_key_; + std::optional own_multi_frame_; + std::optional own_frames_consumer_; + ValidFramesConsumer::Iterator own_frames_it_; }; ScanAll::ScanAll(const std::shared_ptr &input, Symbol output_symbol, storage::v3::View view) From 60b71cc2c107eb63882cd3c1cc9616cf204b32ea Mon Sep 17 00:00:00 2001 From: gvolfing Date: Tue, 31 Jan 2023 17:30:31 +0100 Subject: [PATCH 2/6] Rework ScanByPrimaryKey operator - multiframe --- src/query/v2/plan/operator.cpp | 158 ++++++++++++++------------------- 1 file changed, 65 insertions(+), 93 deletions(-) diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp index ad1674d29..20425de6a 100644 --- a/src/query/v2/plan/operator.cpp +++ b/src/query/v2/plan/operator.cpp @@ -635,83 +635,43 @@ class DistributedScanByPrimaryKeyCursor : public Cursor { return VertexAccessor(vertex, properties, &request_router); } - // TODO (gvolfing) optinal vs empty vector for signaling failure? - bool MakeRequestSingleFrameTwo(Frame &frame, RequestRouterInterface &request_router, ExecutionContext &context) { - // Evaluate the expressions that hold the PrimaryKey. - ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router, - storage::v3::View::NEW); + void MakeRequestMultiFrame(MultiFrame &multi_frame, RequestRouterInterface &request_router, + ExecutionContext &context) { + msgs::GetPropertiesRequest req; + const msgs::Label label = {.id = msgs::LabelId::FromUint(label_.AsUint())}; - std::vector pk; - for (auto *primary_property : primary_key_) { - pk.push_back(TypedValueToValue(primary_property->Accept(evaluator))); + std::unordered_set used_vertex_ids; + + for (auto &frame : multi_frame.GetValidFramesModifier()) { + ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router, + storage::v3::View::NEW); + + std::vector pk; + for (auto *primary_property : primary_key_) { + pk.push_back(TypedValueToValue(primary_property->Accept(evaluator))); + } + + auto vertex_id = std::make_pair(label, std::move(pk)); + auto [it, inserted] = used_vertex_ids.emplace(std::move(vertex_id)); + if (inserted) { + req.vertex_ids.emplace_back(*it); + } } - msgs::Label label = {.id = msgs::LabelId::FromUint(label_.AsUint())}; - - msgs::GetPropertiesRequest req = {.vertex_ids = {std::make_pair(label, pk)}}; auto get_prop_result = std::invoke([&context, &request_router, &req]() mutable { SCOPED_REQUEST_WAIT_PROFILE; return request_router.GetProperties(req); }); - MG_ASSERT(get_prop_result.size() <= 1); - // { - // SCOPED_REQUEST_WAIT_PROFILE; - // std::optional request_label = std::nullopt; - // if (label_.has_value()) { - // request_label = context.request_router->LabelToName(*label_); - // } - // current_batch_ = context.request_router->ScanVertices(request_label); - // } - // current_vertex_it_ = current_batch_.begin(); - // return !current_batch_.empty( + for (auto &result : get_prop_result) { + auto properties = result.props; + // TODO (gvolfing) figure out labels when relevant. + msgs::Vertex vertex = {.id = result.vertex, .labels = {}}; - if (get_prop_result.empty()) { - // return std::nullopt; - return false; + id_to_accessor_mapping_.emplace(result.vertex, VertexAccessor(vertex, properties, &request_router)); } - - auto properties = get_prop_result[0].props; - // TODO (gvolfing) figure out labels when relevant. - msgs::Vertex vertex = {.id = get_prop_result[0].vertex, .labels = {}}; - - current_batch_ = {VertexAccessor(vertex, properties, &request_router)}; - current_vertex_it_ = current_batch_.begin(); - return current_batch_.empty(); } - // std::vector MakeRequest(Frame &frame, RequestRouterInterface &request_router, - // ExecutionContext &context) { - // std::vector ret; - // // Evaluate the expressions that hold the PrimaryKey. - // ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router, - // storage::v3::View::NEW); - - // std::vector pk; - // for (auto *primary_property : primary_key_) { - // pk.push_back(TypedValueToValue(primary_property->Accept(evaluator))); - // } - - // msgs::Label label = {.id = msgs::LabelId::FromUint(label_.AsUint())}; - - // msgs::GetPropertiesRequest req = {.vertex_ids = {std::make_pair(label, pk)}}; - // auto get_prop_result = std::invoke([&context, &request_router, &req]() mutable { - // SCOPED_REQUEST_WAIT_PROFILE; - // return request_router.GetProperties(req); - // }); - // MG_ASSERT(get_prop_result.size() <= 1); - - // if (get_prop_result.empty()) { - // return ret; - // } - // auto properties = get_prop_result[0].props; - // // TODO (gvolfing) figure out labels when relevant. - // msgs::Vertex vertex = {.id = get_prop_result[0].vertex, .labels = {}}; - // auto va = VertexAccessor(vertex, properties, &request_router); - // ret.push_back(va); - // return ret; - // } - bool Pull(Frame &frame, ExecutionContext &context) override { SCOPED_PROFILE_OP(op_name_); @@ -730,15 +690,19 @@ class DistributedScanByPrimaryKeyCursor : public Cursor { return false; } - bool PullMultiple(MultiFrame &output_multi_frame, ExecutionContext &context) override { - SCOPED_PROFILE_OP(op_name_); - + void EnsureOwnMultiFrameIsGood(MultiFrame &output_multi_frame) { if (!own_multi_frame_.has_value()) { own_multi_frame_.emplace(MultiFrame(output_multi_frame.GetFirstFrame().elems().size(), kNumberOfFramesInMultiframe, output_multi_frame.GetMemoryResource())); own_frames_consumer_.emplace(own_multi_frame_->GetValidFramesConsumer()); own_frames_it_ = own_frames_consumer_->begin(); } + MG_ASSERT(output_multi_frame.GetFirstFrame().elems().size() == own_multi_frame_->GetFirstFrame().elems().size()); + } + + bool PullMultiple(MultiFrame &output_multi_frame, ExecutionContext &context) override { + SCOPED_PROFILE_OP(op_name_); + EnsureOwnMultiFrameIsGood(output_multi_frame); auto output_frames_populator = output_multi_frame.GetInvalidFramesPopulator(); auto populated_any = false; @@ -746,49 +710,58 @@ class DistributedScanByPrimaryKeyCursor : public Cursor { while (true) { switch (state_) { case State::PullInput: { + id_to_accessor_mapping_.clear(); if (!input_cursor_->PullMultiple(*own_multi_frame_, context)) { state_ = State::Exhausted; return populated_any; } own_frames_consumer_.emplace(own_multi_frame_->GetValidFramesConsumer()); own_frames_it_ = own_frames_consumer_->begin(); - state_ = State::FetchVertices; - break; - } - case State::FetchVertices: { + if (own_frames_it_ == own_frames_consumer_->end()) { - state_ = State::PullInput; continue; } - if (!filter_expressions_->empty() || current_batch_.empty()) { - // MakeRequest(context); - MakeRequestSingleFrameTwo(*own_frames_it_, *context.request_router, context); - } else { - // We can reuse the vertices as they don't depend on any value from the frames - current_vertex_it_ = current_batch_.begin(); - } + + MakeRequestMultiFrame(*own_multi_frame_, *context.request_router, context); + state_ = State::PopulateOutput; break; } case State::PopulateOutput: { if (!output_multi_frame.HasInvalidFrame()) { + if (own_frames_it_ == own_frames_consumer_->end()) { + id_to_accessor_mapping_.clear(); + } return populated_any; } - if (current_vertex_it_ == current_batch_.end()) { - own_frames_it_->MakeInvalid(); - ++own_frames_it_; - state_ = State::FetchVertices; + + if (own_frames_it_ == own_frames_consumer_->end()) { + state_ = State::PullInput; continue; } for (auto output_frame_it = output_frames_populator.begin(); - output_frame_it != output_frames_populator.end() && current_vertex_it_ != current_batch_.end(); - ++output_frame_it) { + output_frame_it != output_frames_populator.end() && own_frames_it_ != own_frames_consumer_->end(); + ++own_frames_it_) { auto &output_frame = *output_frame_it; - output_frame = *own_frames_it_; - output_frame[output_symbol_] = TypedValue(*current_vertex_it_); - current_vertex_it_++; - populated_any = true; + + ExpressionEvaluator evaluator(&*own_frames_it_, context.symbol_table, context.evaluation_context, + context.request_router, storage::v3::View::NEW); + + std::vector pk; + for (auto *primary_property : primary_key_) { + pk.push_back(TypedValueToValue(primary_property->Accept(evaluator))); + } + + const msgs::Label label = {.id = msgs::LabelId::FromUint(label_.AsUint())}; + auto vertex_id = std::make_pair(label, std::move(pk)); + + if (const auto it = id_to_accessor_mapping_.find(vertex_id); it != id_to_accessor_mapping_.end()) { + output_frame = *own_frames_it_; + output_frame[output_symbol_] = TypedValue(it->second); + populated_any = true; + ++output_frame_it; + } } break; } @@ -805,20 +778,19 @@ class DistributedScanByPrimaryKeyCursor : public Cursor { void Shutdown() override { input_cursor_->Shutdown(); } private: - enum class State { PullInput, FetchVertices, PopulateOutput, Exhausted }; + enum class State { PullInput, PopulateOutput, Exhausted }; State state_{State::PullInput}; const Symbol output_symbol_; const UniqueCursorPtr input_cursor_; const char *op_name_; - std::vector current_batch_; - std::vector::iterator current_vertex_it_{current_batch_.begin()}; storage::v3::LabelId label_; std::optional> filter_expressions_; std::vector primary_key_; std::optional own_multi_frame_; std::optional own_frames_consumer_; ValidFramesConsumer::Iterator own_frames_it_; + std::unordered_map id_to_accessor_mapping_; }; ScanAll::ScanAll(const std::shared_ptr &input, Symbol output_symbol, storage::v3::View view) From 7d63236f871ec60d94c1dff4794ef028783456a6 Mon Sep 17 00:00:00 2001 From: gvolfing Date: Tue, 31 Jan 2023 17:36:52 +0100 Subject: [PATCH 3/6] Set the default pull-mechanism back to single-pull --- src/query/v2/interpreter.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/query/v2/interpreter.cpp b/src/query/v2/interpreter.cpp index fe7a17bf7..aa220d764 100644 --- a/src/query/v2/interpreter.cpp +++ b/src/query/v2/interpreter.cpp @@ -812,8 +812,7 @@ std::optional PullPlan::PullMultiple(AnyStrea std::optional PullPlan::Pull(AnyStream *stream, std::optional n, const std::vector &output_symbols, std::map *summary) { - // auto should_pull_multiple = false; // TODO on the long term, we will only use PullMultiple - auto should_pull_multiple = true; + auto should_pull_multiple = false; // TODO on the long term, we will only use PullMultiple if (should_pull_multiple) { return PullMultiple(stream, n, output_symbols, summary); } From 4be4a86d0a932a2242c15967844d9ce3eb1c1676 Mon Sep 17 00:00:00 2001 From: gvolfing <107616712+gvolfing@users.noreply.github.com> Date: Wed, 1 Feb 2023 11:39:48 +0100 Subject: [PATCH 4/6] Apply suggestions from code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: János Benjamin Antal --- src/query/v2/plan/operator.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp index 20425de6a..4a512fcd6 100644 --- a/src/query/v2/plan/operator.cpp +++ b/src/query/v2/plan/operator.cpp @@ -664,11 +664,10 @@ class DistributedScanByPrimaryKeyCursor : public Cursor { }); for (auto &result : get_prop_result) { - auto properties = result.props; // TODO (gvolfing) figure out labels when relevant. msgs::Vertex vertex = {.id = result.vertex, .labels = {}}; - id_to_accessor_mapping_.emplace(result.vertex, VertexAccessor(vertex, properties, &request_router)); + id_to_accessor_mapping_.emplace(result.vertex, VertexAccessor(std::move(vertex), std::move(result.properties), &request_router)); } } From bf93b53e7dcec4bbad54fe17968ce2d16b499a66 Mon Sep 17 00:00:00 2001 From: gvolfing Date: Wed, 1 Feb 2023 12:36:27 +0100 Subject: [PATCH 5/6] Fix compile error due to wrong aggregate field name --- src/query/v2/plan/operator.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp index 4a512fcd6..86b804db2 100644 --- a/src/query/v2/plan/operator.cpp +++ b/src/query/v2/plan/operator.cpp @@ -667,7 +667,8 @@ class DistributedScanByPrimaryKeyCursor : public Cursor { // TODO (gvolfing) figure out labels when relevant. msgs::Vertex vertex = {.id = result.vertex, .labels = {}}; - id_to_accessor_mapping_.emplace(result.vertex, VertexAccessor(std::move(vertex), std::move(result.properties), &request_router)); + id_to_accessor_mapping_.emplace(result.vertex, + VertexAccessor(std::move(vertex), std::move(result.props), &request_router)); } } From 8e315875f2f5aa08c60cc27dee29df5195399220 Mon Sep 17 00:00:00 2001 From: gvolfing <107616712+gvolfing@users.noreply.github.com> Date: Thu, 2 Feb 2023 07:44:47 +0100 Subject: [PATCH 6/6] Update src/query/v2/plan/operator.cpp MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: János Benjamin Antal --- src/query/v2/plan/operator.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp index 86b804db2..1139c0fbd 100644 --- a/src/query/v2/plan/operator.cpp +++ b/src/query/v2/plan/operator.cpp @@ -761,7 +761,8 @@ class DistributedScanByPrimaryKeyCursor : public Cursor { output_frame[output_symbol_] = TypedValue(it->second); populated_any = true; ++output_frame_it; - } + } + own_frames_it_->MakeInvalid(); } break; }