Merge pull request #752 from memgraph/T1226-MG-Implement-scanbyprimarykey-with-multiframe
Implement ScanByPrimaryKey with multiframe
This commit is contained in:
commit
8b392ecc97
@ -603,8 +603,6 @@ class DistributedScanByPrimaryKeyCursor : public Cursor {
|
||||
filter_expressions_(filter_expressions),
|
||||
primary_key_(primary_key) {}
|
||||
|
||||
enum class State : int8_t { INITIALIZING, COMPLETED };
|
||||
|
||||
using VertexAccessor = accessors::VertexAccessor;
|
||||
|
||||
std::optional<VertexAccessor> MakeRequestSingleFrame(Frame &frame, RequestRouterInterface &request_router,
|
||||
@ -637,6 +635,43 @@ class DistributedScanByPrimaryKeyCursor : public Cursor {
|
||||
return VertexAccessor(vertex, properties, &request_router);
|
||||
}
|
||||
|
||||
void MakeRequestMultiFrame(MultiFrame &multi_frame, RequestRouterInterface &request_router,
|
||||
ExecutionContext &context) {
|
||||
msgs::GetPropertiesRequest req;
|
||||
const msgs::Label label = {.id = msgs::LabelId::FromUint(label_.AsUint())};
|
||||
|
||||
std::unordered_set<msgs::VertexId> used_vertex_ids;
|
||||
|
||||
for (auto &frame : multi_frame.GetValidFramesModifier()) {
|
||||
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router,
|
||||
storage::v3::View::NEW);
|
||||
|
||||
std::vector<msgs::Value> pk;
|
||||
for (auto *primary_property : primary_key_) {
|
||||
pk.push_back(TypedValueToValue(primary_property->Accept(evaluator)));
|
||||
}
|
||||
|
||||
auto vertex_id = std::make_pair(label, std::move(pk));
|
||||
auto [it, inserted] = used_vertex_ids.emplace(std::move(vertex_id));
|
||||
if (inserted) {
|
||||
req.vertex_ids.emplace_back(*it);
|
||||
}
|
||||
}
|
||||
|
||||
auto get_prop_result = std::invoke([&context, &request_router, &req]() mutable {
|
||||
SCOPED_REQUEST_WAIT_PROFILE;
|
||||
return request_router.GetProperties(req);
|
||||
});
|
||||
|
||||
for (auto &result : get_prop_result) {
|
||||
// TODO (gvolfing) figure out labels when relevant.
|
||||
msgs::Vertex vertex = {.id = result.vertex, .labels = {}};
|
||||
|
||||
id_to_accessor_mapping_.emplace(result.vertex,
|
||||
VertexAccessor(std::move(vertex), std::move(result.props), &request_router));
|
||||
}
|
||||
}
|
||||
|
||||
bool Pull(Frame &frame, ExecutionContext &context) override {
|
||||
SCOPED_PROFILE_OP(op_name_);
|
||||
|
||||
@ -655,17 +690,108 @@ class DistributedScanByPrimaryKeyCursor : public Cursor {
|
||||
return false;
|
||||
}
|
||||
|
||||
void EnsureOwnMultiFrameIsGood(MultiFrame &output_multi_frame) {
|
||||
if (!own_multi_frame_.has_value()) {
|
||||
own_multi_frame_.emplace(MultiFrame(output_multi_frame.GetFirstFrame().elems().size(),
|
||||
kNumberOfFramesInMultiframe, output_multi_frame.GetMemoryResource()));
|
||||
own_frames_consumer_.emplace(own_multi_frame_->GetValidFramesConsumer());
|
||||
own_frames_it_ = own_frames_consumer_->begin();
|
||||
}
|
||||
MG_ASSERT(output_multi_frame.GetFirstFrame().elems().size() == own_multi_frame_->GetFirstFrame().elems().size());
|
||||
}
|
||||
|
||||
bool PullMultiple(MultiFrame &output_multi_frame, ExecutionContext &context) override {
|
||||
SCOPED_PROFILE_OP(op_name_);
|
||||
EnsureOwnMultiFrameIsGood(output_multi_frame);
|
||||
|
||||
auto output_frames_populator = output_multi_frame.GetInvalidFramesPopulator();
|
||||
auto populated_any = false;
|
||||
|
||||
while (true) {
|
||||
switch (state_) {
|
||||
case State::PullInput: {
|
||||
id_to_accessor_mapping_.clear();
|
||||
if (!input_cursor_->PullMultiple(*own_multi_frame_, context)) {
|
||||
state_ = State::Exhausted;
|
||||
return populated_any;
|
||||
}
|
||||
own_frames_consumer_.emplace(own_multi_frame_->GetValidFramesConsumer());
|
||||
own_frames_it_ = own_frames_consumer_->begin();
|
||||
|
||||
if (own_frames_it_ == own_frames_consumer_->end()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
MakeRequestMultiFrame(*own_multi_frame_, *context.request_router, context);
|
||||
|
||||
state_ = State::PopulateOutput;
|
||||
break;
|
||||
}
|
||||
case State::PopulateOutput: {
|
||||
if (!output_multi_frame.HasInvalidFrame()) {
|
||||
if (own_frames_it_ == own_frames_consumer_->end()) {
|
||||
id_to_accessor_mapping_.clear();
|
||||
}
|
||||
return populated_any;
|
||||
}
|
||||
|
||||
if (own_frames_it_ == own_frames_consumer_->end()) {
|
||||
state_ = State::PullInput;
|
||||
continue;
|
||||
}
|
||||
|
||||
for (auto output_frame_it = output_frames_populator.begin();
|
||||
output_frame_it != output_frames_populator.end() && own_frames_it_ != own_frames_consumer_->end();
|
||||
++own_frames_it_) {
|
||||
auto &output_frame = *output_frame_it;
|
||||
|
||||
ExpressionEvaluator evaluator(&*own_frames_it_, context.symbol_table, context.evaluation_context,
|
||||
context.request_router, storage::v3::View::NEW);
|
||||
|
||||
std::vector<msgs::Value> pk;
|
||||
for (auto *primary_property : primary_key_) {
|
||||
pk.push_back(TypedValueToValue(primary_property->Accept(evaluator)));
|
||||
}
|
||||
|
||||
const msgs::Label label = {.id = msgs::LabelId::FromUint(label_.AsUint())};
|
||||
auto vertex_id = std::make_pair(label, std::move(pk));
|
||||
|
||||
if (const auto it = id_to_accessor_mapping_.find(vertex_id); it != id_to_accessor_mapping_.end()) {
|
||||
output_frame = *own_frames_it_;
|
||||
output_frame[output_symbol_] = TypedValue(it->second);
|
||||
populated_any = true;
|
||||
++output_frame_it;
|
||||
}
|
||||
own_frames_it_->MakeInvalid();
|
||||
}
|
||||
break;
|
||||
}
|
||||
case State::Exhausted: {
|
||||
return populated_any;
|
||||
}
|
||||
}
|
||||
}
|
||||
return populated_any;
|
||||
};
|
||||
|
||||
void Reset() override { input_cursor_->Reset(); }
|
||||
|
||||
void Shutdown() override { input_cursor_->Shutdown(); }
|
||||
|
||||
private:
|
||||
enum class State { PullInput, PopulateOutput, Exhausted };
|
||||
|
||||
State state_{State::PullInput};
|
||||
const Symbol output_symbol_;
|
||||
const UniqueCursorPtr input_cursor_;
|
||||
const char *op_name_;
|
||||
storage::v3::LabelId label_;
|
||||
std::optional<std::vector<Expression *>> filter_expressions_;
|
||||
std::vector<Expression *> primary_key_;
|
||||
std::optional<MultiFrame> own_multi_frame_;
|
||||
std::optional<ValidFramesConsumer> own_frames_consumer_;
|
||||
ValidFramesConsumer::Iterator own_frames_it_;
|
||||
std::unordered_map<msgs::VertexId, VertexAccessor> id_to_accessor_mapping_;
|
||||
};
|
||||
|
||||
ScanAll::ScanAll(const std::shared_ptr<LogicalOperator> &input, Symbol output_symbol, storage::v3::View view)
|
||||
|
Loading…
Reference in New Issue
Block a user