Init POC of ScanByPrimaryKey multiframe
This commit is contained in:
parent
f67422f8b9
commit
436e41f71f
@ -812,7 +812,8 @@ std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::PullMultiple(AnyStrea
|
|||||||
std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::Pull(AnyStream *stream, std::optional<int> n,
|
std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::Pull(AnyStream *stream, std::optional<int> n,
|
||||||
const std::vector<Symbol> &output_symbols,
|
const std::vector<Symbol> &output_symbols,
|
||||||
std::map<std::string, TypedValue> *summary) {
|
std::map<std::string, TypedValue> *summary) {
|
||||||
auto should_pull_multiple = false; // TODO on the long term, we will only use PullMultiple
|
// auto should_pull_multiple = false; // TODO on the long term, we will only use PullMultiple
|
||||||
|
auto should_pull_multiple = true;
|
||||||
if (should_pull_multiple) {
|
if (should_pull_multiple) {
|
||||||
return PullMultiple(stream, n, output_symbols, summary);
|
return PullMultiple(stream, n, output_symbols, summary);
|
||||||
}
|
}
|
||||||
|
@ -603,8 +603,6 @@ class DistributedScanByPrimaryKeyCursor : public Cursor {
|
|||||||
filter_expressions_(filter_expressions),
|
filter_expressions_(filter_expressions),
|
||||||
primary_key_(primary_key) {}
|
primary_key_(primary_key) {}
|
||||||
|
|
||||||
enum class State : int8_t { INITIALIZING, COMPLETED };
|
|
||||||
|
|
||||||
using VertexAccessor = accessors::VertexAccessor;
|
using VertexAccessor = accessors::VertexAccessor;
|
||||||
|
|
||||||
std::optional<VertexAccessor> MakeRequestSingleFrame(Frame &frame, RequestRouterInterface &request_router,
|
std::optional<VertexAccessor> MakeRequestSingleFrame(Frame &frame, RequestRouterInterface &request_router,
|
||||||
@ -637,6 +635,83 @@ class DistributedScanByPrimaryKeyCursor : public Cursor {
|
|||||||
return VertexAccessor(vertex, properties, &request_router);
|
return VertexAccessor(vertex, properties, &request_router);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO (gvolfing) optinal vs empty vector for signaling failure?
|
||||||
|
bool MakeRequestSingleFrameTwo(Frame &frame, RequestRouterInterface &request_router, ExecutionContext &context) {
|
||||||
|
// Evaluate the expressions that hold the PrimaryKey.
|
||||||
|
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router,
|
||||||
|
storage::v3::View::NEW);
|
||||||
|
|
||||||
|
std::vector<msgs::Value> pk;
|
||||||
|
for (auto *primary_property : primary_key_) {
|
||||||
|
pk.push_back(TypedValueToValue(primary_property->Accept(evaluator)));
|
||||||
|
}
|
||||||
|
|
||||||
|
msgs::Label label = {.id = msgs::LabelId::FromUint(label_.AsUint())};
|
||||||
|
|
||||||
|
msgs::GetPropertiesRequest req = {.vertex_ids = {std::make_pair(label, pk)}};
|
||||||
|
auto get_prop_result = std::invoke([&context, &request_router, &req]() mutable {
|
||||||
|
SCOPED_REQUEST_WAIT_PROFILE;
|
||||||
|
return request_router.GetProperties(req);
|
||||||
|
});
|
||||||
|
MG_ASSERT(get_prop_result.size() <= 1);
|
||||||
|
|
||||||
|
// {
|
||||||
|
// SCOPED_REQUEST_WAIT_PROFILE;
|
||||||
|
// std::optional<std::string> request_label = std::nullopt;
|
||||||
|
// if (label_.has_value()) {
|
||||||
|
// request_label = context.request_router->LabelToName(*label_);
|
||||||
|
// }
|
||||||
|
// current_batch_ = context.request_router->ScanVertices(request_label);
|
||||||
|
// }
|
||||||
|
// current_vertex_it_ = current_batch_.begin();
|
||||||
|
// return !current_batch_.empty(
|
||||||
|
|
||||||
|
if (get_prop_result.empty()) {
|
||||||
|
// return std::nullopt;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto properties = get_prop_result[0].props;
|
||||||
|
// TODO (gvolfing) figure out labels when relevant.
|
||||||
|
msgs::Vertex vertex = {.id = get_prop_result[0].vertex, .labels = {}};
|
||||||
|
|
||||||
|
current_batch_ = {VertexAccessor(vertex, properties, &request_router)};
|
||||||
|
current_vertex_it_ = current_batch_.begin();
|
||||||
|
return current_batch_.empty();
|
||||||
|
}
|
||||||
|
|
||||||
|
// std::vector<VertexAccessor> MakeRequest(Frame &frame, RequestRouterInterface &request_router,
|
||||||
|
// ExecutionContext &context) {
|
||||||
|
// std::vector<VertexAccessor> ret;
|
||||||
|
// // Evaluate the expressions that hold the PrimaryKey.
|
||||||
|
// ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router,
|
||||||
|
// storage::v3::View::NEW);
|
||||||
|
|
||||||
|
// std::vector<msgs::Value> pk;
|
||||||
|
// for (auto *primary_property : primary_key_) {
|
||||||
|
// pk.push_back(TypedValueToValue(primary_property->Accept(evaluator)));
|
||||||
|
// }
|
||||||
|
|
||||||
|
// msgs::Label label = {.id = msgs::LabelId::FromUint(label_.AsUint())};
|
||||||
|
|
||||||
|
// msgs::GetPropertiesRequest req = {.vertex_ids = {std::make_pair(label, pk)}};
|
||||||
|
// auto get_prop_result = std::invoke([&context, &request_router, &req]() mutable {
|
||||||
|
// SCOPED_REQUEST_WAIT_PROFILE;
|
||||||
|
// return request_router.GetProperties(req);
|
||||||
|
// });
|
||||||
|
// MG_ASSERT(get_prop_result.size() <= 1);
|
||||||
|
|
||||||
|
// if (get_prop_result.empty()) {
|
||||||
|
// return ret;
|
||||||
|
// }
|
||||||
|
// auto properties = get_prop_result[0].props;
|
||||||
|
// // TODO (gvolfing) figure out labels when relevant.
|
||||||
|
// msgs::Vertex vertex = {.id = get_prop_result[0].vertex, .labels = {}};
|
||||||
|
// auto va = VertexAccessor(vertex, properties, &request_router);
|
||||||
|
// ret.push_back(va);
|
||||||
|
// return ret;
|
||||||
|
// }
|
||||||
|
|
||||||
bool Pull(Frame &frame, ExecutionContext &context) override {
|
bool Pull(Frame &frame, ExecutionContext &context) override {
|
||||||
SCOPED_PROFILE_OP(op_name_);
|
SCOPED_PROFILE_OP(op_name_);
|
||||||
|
|
||||||
@ -655,17 +730,95 @@ class DistributedScanByPrimaryKeyCursor : public Cursor {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool PullMultiple(MultiFrame &output_multi_frame, ExecutionContext &context) override {
|
||||||
|
SCOPED_PROFILE_OP(op_name_);
|
||||||
|
|
||||||
|
if (!own_multi_frame_.has_value()) {
|
||||||
|
own_multi_frame_.emplace(MultiFrame(output_multi_frame.GetFirstFrame().elems().size(),
|
||||||
|
kNumberOfFramesInMultiframe, output_multi_frame.GetMemoryResource()));
|
||||||
|
own_frames_consumer_.emplace(own_multi_frame_->GetValidFramesConsumer());
|
||||||
|
own_frames_it_ = own_frames_consumer_->begin();
|
||||||
|
}
|
||||||
|
|
||||||
|
auto output_frames_populator = output_multi_frame.GetInvalidFramesPopulator();
|
||||||
|
auto populated_any = false;
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
switch (state_) {
|
||||||
|
case State::PullInput: {
|
||||||
|
if (!input_cursor_->PullMultiple(*own_multi_frame_, context)) {
|
||||||
|
state_ = State::Exhausted;
|
||||||
|
return populated_any;
|
||||||
|
}
|
||||||
|
own_frames_consumer_.emplace(own_multi_frame_->GetValidFramesConsumer());
|
||||||
|
own_frames_it_ = own_frames_consumer_->begin();
|
||||||
|
state_ = State::FetchVertices;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case State::FetchVertices: {
|
||||||
|
if (own_frames_it_ == own_frames_consumer_->end()) {
|
||||||
|
state_ = State::PullInput;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (!filter_expressions_->empty() || current_batch_.empty()) {
|
||||||
|
// MakeRequest(context);
|
||||||
|
MakeRequestSingleFrameTwo(*own_frames_it_, *context.request_router, context);
|
||||||
|
} else {
|
||||||
|
// We can reuse the vertices as they don't depend on any value from the frames
|
||||||
|
current_vertex_it_ = current_batch_.begin();
|
||||||
|
}
|
||||||
|
state_ = State::PopulateOutput;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case State::PopulateOutput: {
|
||||||
|
if (!output_multi_frame.HasInvalidFrame()) {
|
||||||
|
return populated_any;
|
||||||
|
}
|
||||||
|
if (current_vertex_it_ == current_batch_.end()) {
|
||||||
|
own_frames_it_->MakeInvalid();
|
||||||
|
++own_frames_it_;
|
||||||
|
state_ = State::FetchVertices;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (auto output_frame_it = output_frames_populator.begin();
|
||||||
|
output_frame_it != output_frames_populator.end() && current_vertex_it_ != current_batch_.end();
|
||||||
|
++output_frame_it) {
|
||||||
|
auto &output_frame = *output_frame_it;
|
||||||
|
output_frame = *own_frames_it_;
|
||||||
|
output_frame[output_symbol_] = TypedValue(*current_vertex_it_);
|
||||||
|
current_vertex_it_++;
|
||||||
|
populated_any = true;
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case State::Exhausted: {
|
||||||
|
return populated_any;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return populated_any;
|
||||||
|
};
|
||||||
|
|
||||||
void Reset() override { input_cursor_->Reset(); }
|
void Reset() override { input_cursor_->Reset(); }
|
||||||
|
|
||||||
void Shutdown() override { input_cursor_->Shutdown(); }
|
void Shutdown() override { input_cursor_->Shutdown(); }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
enum class State { PullInput, FetchVertices, PopulateOutput, Exhausted };
|
||||||
|
|
||||||
|
State state_{State::PullInput};
|
||||||
const Symbol output_symbol_;
|
const Symbol output_symbol_;
|
||||||
const UniqueCursorPtr input_cursor_;
|
const UniqueCursorPtr input_cursor_;
|
||||||
const char *op_name_;
|
const char *op_name_;
|
||||||
|
std::vector<VertexAccessor> current_batch_;
|
||||||
|
std::vector<VertexAccessor>::iterator current_vertex_it_{current_batch_.begin()};
|
||||||
storage::v3::LabelId label_;
|
storage::v3::LabelId label_;
|
||||||
std::optional<std::vector<Expression *>> filter_expressions_;
|
std::optional<std::vector<Expression *>> filter_expressions_;
|
||||||
std::vector<Expression *> primary_key_;
|
std::vector<Expression *> primary_key_;
|
||||||
|
std::optional<MultiFrame> own_multi_frame_;
|
||||||
|
std::optional<ValidFramesConsumer> own_frames_consumer_;
|
||||||
|
ValidFramesConsumer::Iterator own_frames_it_;
|
||||||
};
|
};
|
||||||
|
|
||||||
ScanAll::ScanAll(const std::shared_ptr<LogicalOperator> &input, Symbol output_symbol, storage::v3::View view)
|
ScanAll::ScanAll(const std::shared_ptr<LogicalOperator> &input, Symbol output_symbol, storage::v3::View view)
|
||||||
|
Loading…
Reference in New Issue
Block a user