Rework ScanByPrimaryKey operator - multiframe

This commit is contained in:
gvolfing 2023-01-31 17:30:31 +01:00
parent 272e710510
commit 60b71cc2c1

View File

@ -635,83 +635,43 @@ class DistributedScanByPrimaryKeyCursor : public Cursor {
return VertexAccessor(vertex, properties, &request_router);
}
// TODO (gvolfing) optinal vs empty vector for signaling failure?
bool MakeRequestSingleFrameTwo(Frame &frame, RequestRouterInterface &request_router, ExecutionContext &context) {
// Evaluate the expressions that hold the PrimaryKey.
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router,
storage::v3::View::NEW);
void MakeRequestMultiFrame(MultiFrame &multi_frame, RequestRouterInterface &request_router,
ExecutionContext &context) {
msgs::GetPropertiesRequest req;
const msgs::Label label = {.id = msgs::LabelId::FromUint(label_.AsUint())};
std::vector<msgs::Value> pk;
for (auto *primary_property : primary_key_) {
pk.push_back(TypedValueToValue(primary_property->Accept(evaluator)));
std::unordered_set<msgs::VertexId> used_vertex_ids;
for (auto &frame : multi_frame.GetValidFramesModifier()) {
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router,
storage::v3::View::NEW);
std::vector<msgs::Value> pk;
for (auto *primary_property : primary_key_) {
pk.push_back(TypedValueToValue(primary_property->Accept(evaluator)));
}
auto vertex_id = std::make_pair(label, std::move(pk));
auto [it, inserted] = used_vertex_ids.emplace(std::move(vertex_id));
if (inserted) {
req.vertex_ids.emplace_back(*it);
}
}
msgs::Label label = {.id = msgs::LabelId::FromUint(label_.AsUint())};
msgs::GetPropertiesRequest req = {.vertex_ids = {std::make_pair(label, pk)}};
auto get_prop_result = std::invoke([&context, &request_router, &req]() mutable {
SCOPED_REQUEST_WAIT_PROFILE;
return request_router.GetProperties(req);
});
MG_ASSERT(get_prop_result.size() <= 1);
// {
// SCOPED_REQUEST_WAIT_PROFILE;
// std::optional<std::string> request_label = std::nullopt;
// if (label_.has_value()) {
// request_label = context.request_router->LabelToName(*label_);
// }
// current_batch_ = context.request_router->ScanVertices(request_label);
// }
// current_vertex_it_ = current_batch_.begin();
// return !current_batch_.empty(
for (auto &result : get_prop_result) {
auto properties = result.props;
// TODO (gvolfing) figure out labels when relevant.
msgs::Vertex vertex = {.id = result.vertex, .labels = {}};
if (get_prop_result.empty()) {
// return std::nullopt;
return false;
id_to_accessor_mapping_.emplace(result.vertex, VertexAccessor(vertex, properties, &request_router));
}
auto properties = get_prop_result[0].props;
// TODO (gvolfing) figure out labels when relevant.
msgs::Vertex vertex = {.id = get_prop_result[0].vertex, .labels = {}};
current_batch_ = {VertexAccessor(vertex, properties, &request_router)};
current_vertex_it_ = current_batch_.begin();
return current_batch_.empty();
}
// std::vector<VertexAccessor> MakeRequest(Frame &frame, RequestRouterInterface &request_router,
// ExecutionContext &context) {
// std::vector<VertexAccessor> ret;
// // Evaluate the expressions that hold the PrimaryKey.
// ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router,
// storage::v3::View::NEW);
// std::vector<msgs::Value> pk;
// for (auto *primary_property : primary_key_) {
// pk.push_back(TypedValueToValue(primary_property->Accept(evaluator)));
// }
// msgs::Label label = {.id = msgs::LabelId::FromUint(label_.AsUint())};
// msgs::GetPropertiesRequest req = {.vertex_ids = {std::make_pair(label, pk)}};
// auto get_prop_result = std::invoke([&context, &request_router, &req]() mutable {
// SCOPED_REQUEST_WAIT_PROFILE;
// return request_router.GetProperties(req);
// });
// MG_ASSERT(get_prop_result.size() <= 1);
// if (get_prop_result.empty()) {
// return ret;
// }
// auto properties = get_prop_result[0].props;
// // TODO (gvolfing) figure out labels when relevant.
// msgs::Vertex vertex = {.id = get_prop_result[0].vertex, .labels = {}};
// auto va = VertexAccessor(vertex, properties, &request_router);
// ret.push_back(va);
// return ret;
// }
bool Pull(Frame &frame, ExecutionContext &context) override {
SCOPED_PROFILE_OP(op_name_);
@ -730,15 +690,19 @@ class DistributedScanByPrimaryKeyCursor : public Cursor {
return false;
}
bool PullMultiple(MultiFrame &output_multi_frame, ExecutionContext &context) override {
SCOPED_PROFILE_OP(op_name_);
void EnsureOwnMultiFrameIsGood(MultiFrame &output_multi_frame) {
if (!own_multi_frame_.has_value()) {
own_multi_frame_.emplace(MultiFrame(output_multi_frame.GetFirstFrame().elems().size(),
kNumberOfFramesInMultiframe, output_multi_frame.GetMemoryResource()));
own_frames_consumer_.emplace(own_multi_frame_->GetValidFramesConsumer());
own_frames_it_ = own_frames_consumer_->begin();
}
MG_ASSERT(output_multi_frame.GetFirstFrame().elems().size() == own_multi_frame_->GetFirstFrame().elems().size());
}
bool PullMultiple(MultiFrame &output_multi_frame, ExecutionContext &context) override {
SCOPED_PROFILE_OP(op_name_);
EnsureOwnMultiFrameIsGood(output_multi_frame);
auto output_frames_populator = output_multi_frame.GetInvalidFramesPopulator();
auto populated_any = false;
@ -746,49 +710,58 @@ class DistributedScanByPrimaryKeyCursor : public Cursor {
while (true) {
switch (state_) {
case State::PullInput: {
id_to_accessor_mapping_.clear();
if (!input_cursor_->PullMultiple(*own_multi_frame_, context)) {
state_ = State::Exhausted;
return populated_any;
}
own_frames_consumer_.emplace(own_multi_frame_->GetValidFramesConsumer());
own_frames_it_ = own_frames_consumer_->begin();
state_ = State::FetchVertices;
break;
}
case State::FetchVertices: {
if (own_frames_it_ == own_frames_consumer_->end()) {
state_ = State::PullInput;
continue;
}
if (!filter_expressions_->empty() || current_batch_.empty()) {
// MakeRequest(context);
MakeRequestSingleFrameTwo(*own_frames_it_, *context.request_router, context);
} else {
// We can reuse the vertices as they don't depend on any value from the frames
current_vertex_it_ = current_batch_.begin();
}
MakeRequestMultiFrame(*own_multi_frame_, *context.request_router, context);
state_ = State::PopulateOutput;
break;
}
case State::PopulateOutput: {
if (!output_multi_frame.HasInvalidFrame()) {
if (own_frames_it_ == own_frames_consumer_->end()) {
id_to_accessor_mapping_.clear();
}
return populated_any;
}
if (current_vertex_it_ == current_batch_.end()) {
own_frames_it_->MakeInvalid();
++own_frames_it_;
state_ = State::FetchVertices;
if (own_frames_it_ == own_frames_consumer_->end()) {
state_ = State::PullInput;
continue;
}
for (auto output_frame_it = output_frames_populator.begin();
output_frame_it != output_frames_populator.end() && current_vertex_it_ != current_batch_.end();
++output_frame_it) {
output_frame_it != output_frames_populator.end() && own_frames_it_ != own_frames_consumer_->end();
++own_frames_it_) {
auto &output_frame = *output_frame_it;
output_frame = *own_frames_it_;
output_frame[output_symbol_] = TypedValue(*current_vertex_it_);
current_vertex_it_++;
populated_any = true;
ExpressionEvaluator evaluator(&*own_frames_it_, context.symbol_table, context.evaluation_context,
context.request_router, storage::v3::View::NEW);
std::vector<msgs::Value> pk;
for (auto *primary_property : primary_key_) {
pk.push_back(TypedValueToValue(primary_property->Accept(evaluator)));
}
const msgs::Label label = {.id = msgs::LabelId::FromUint(label_.AsUint())};
auto vertex_id = std::make_pair(label, std::move(pk));
if (const auto it = id_to_accessor_mapping_.find(vertex_id); it != id_to_accessor_mapping_.end()) {
output_frame = *own_frames_it_;
output_frame[output_symbol_] = TypedValue(it->second);
populated_any = true;
++output_frame_it;
}
}
break;
}
@ -805,20 +778,19 @@ class DistributedScanByPrimaryKeyCursor : public Cursor {
void Shutdown() override { input_cursor_->Shutdown(); }
private:
enum class State { PullInput, FetchVertices, PopulateOutput, Exhausted };
enum class State { PullInput, PopulateOutput, Exhausted };
State state_{State::PullInput};
const Symbol output_symbol_;
const UniqueCursorPtr input_cursor_;
const char *op_name_;
std::vector<VertexAccessor> current_batch_;
std::vector<VertexAccessor>::iterator current_vertex_it_{current_batch_.begin()};
storage::v3::LabelId label_;
std::optional<std::vector<Expression *>> filter_expressions_;
std::vector<Expression *> primary_key_;
std::optional<MultiFrame> own_multi_frame_;
std::optional<ValidFramesConsumer> own_frames_consumer_;
ValidFramesConsumer::Iterator own_frames_it_;
std::unordered_map<msgs::VertexId, VertexAccessor> id_to_accessor_mapping_;
};
ScanAll::ScanAll(const std::shared_ptr<LogicalOperator> &input, Symbol output_symbol, storage::v3::View view)