|
|
|
@ -190,18 +190,19 @@ class DistributedCreateNodeCursor : public Cursor {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void PullMultiple(MultiFrame &multi_frame, ExecutionContext &context) override {
|
|
|
|
|
bool PullMultiple(MultiFrame &multi_frame, ExecutionContext &context) override {
|
|
|
|
|
SCOPED_PROFILE_OP("CreateNodeMF");
|
|
|
|
|
input_cursor_->PullMultiple(multi_frame, context);
|
|
|
|
|
|
|
|
|
|
auto *request_router = context.request_router;
|
|
|
|
|
if (!multi_frame.HasValidFrame()) {
|
|
|
|
|
return;
|
|
|
|
|
if (!input_cursor_->PullMultiple(multi_frame, context)) {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
{
|
|
|
|
|
SCOPED_REQUEST_WAIT_PROFILE;
|
|
|
|
|
request_router->CreateVertices(NodeCreationInfoToRequests(context, multi_frame));
|
|
|
|
|
}
|
|
|
|
|
PlaceNodesOnTheMultiFrame(multi_frame, context);
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void Shutdown() override { input_cursor_->Shutdown(); }
|
|
|
|
@ -325,14 +326,16 @@ bool Once::OnceCursor::Pull(Frame &, ExecutionContext &context) {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void Once::OnceCursor::PullMultiple(MultiFrame &multi_frame, ExecutionContext &context) {
|
|
|
|
|
bool Once::OnceCursor::PullMultiple(MultiFrame &multi_frame, ExecutionContext &context) {
|
|
|
|
|
SCOPED_PROFILE_OP("OnceMF");
|
|
|
|
|
|
|
|
|
|
if (!did_pull_) {
|
|
|
|
|
auto &first_frame = multi_frame.GetFirstFrame();
|
|
|
|
|
first_frame.MakeValid();
|
|
|
|
|
did_pull_ = true;
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
UniqueCursorPtr Once::MakeCursor(utils::MemoryResource *mem) const {
|
|
|
|
@ -453,8 +456,6 @@ class DistributedScanAllAndFilterCursor : public Cursor {
|
|
|
|
|
ResetExecutionState();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
enum class State : int8_t { INITIALIZING, COMPLETED };
|
|
|
|
|
|
|
|
|
|
using VertexAccessor = accessors::VertexAccessor;
|
|
|
|
|
|
|
|
|
|
bool MakeRequest(ExecutionContext &context) {
|
|
|
|
@ -491,60 +492,73 @@ class DistributedScanAllAndFilterCursor : public Cursor {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool PullNextFrames(ExecutionContext &context) {
|
|
|
|
|
input_cursor_->PullMultiple(*own_multi_frame_, context);
|
|
|
|
|
own_frames_consumer_ = own_multi_frame_->GetValidFramesConsumer();
|
|
|
|
|
own_frames_it_ = own_frames_consumer_->begin();
|
|
|
|
|
return own_multi_frame_->HasValidFrame();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
inline bool HasMoreResult() {
|
|
|
|
|
return current_vertex_it_ != current_batch_.end() && own_frames_it_ != own_frames_consumer_->end();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool PopulateFrame(ExecutionContext &context, FrameWithValidity &frame) {
|
|
|
|
|
MG_ASSERT(HasMoreResult());
|
|
|
|
|
|
|
|
|
|
frame = *own_frames_it_;
|
|
|
|
|
frame[output_symbol_] = TypedValue(*current_vertex_it_);
|
|
|
|
|
|
|
|
|
|
++current_vertex_it_;
|
|
|
|
|
if (current_vertex_it_ == current_batch_.end()) {
|
|
|
|
|
own_frames_it_->MakeInvalid();
|
|
|
|
|
++own_frames_it_;
|
|
|
|
|
|
|
|
|
|
current_vertex_it_ = current_batch_.begin();
|
|
|
|
|
|
|
|
|
|
if (own_frames_it_ == own_frames_consumer_->end()) {
|
|
|
|
|
return PullNextFrames(context);
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void PullMultiple(MultiFrame &output_multi_frame, ExecutionContext &context) override {
|
|
|
|
|
bool PullMultiple(MultiFrame &output_multi_frame, ExecutionContext &context) override {
|
|
|
|
|
SCOPED_PROFILE_OP(op_name_);
|
|
|
|
|
|
|
|
|
|
if (!own_multi_frame_.has_value()) {
|
|
|
|
|
own_multi_frame_.emplace(MultiFrame(output_multi_frame.GetFirstFrame().elems().size(),
|
|
|
|
|
kNumberOfFramesInMultiframe, output_multi_frame.GetMemoryResource()));
|
|
|
|
|
|
|
|
|
|
MakeRequest(context);
|
|
|
|
|
PullNextFrames(context);
|
|
|
|
|
own_frames_consumer_.emplace(own_multi_frame_->GetValidFramesConsumer());
|
|
|
|
|
own_frames_it_ = own_frames_consumer_->begin();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!HasMoreResult()) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
auto output_frames_populator = output_multi_frame.GetInvalidFramesPopulator();
|
|
|
|
|
auto populated_any = false;
|
|
|
|
|
|
|
|
|
|
for (auto &frame : output_multi_frame.GetInvalidFramesPopulator()) {
|
|
|
|
|
if (MustAbort(context)) {
|
|
|
|
|
throw HintedAbortError();
|
|
|
|
|
}
|
|
|
|
|
if (!PopulateFrame(context, frame)) {
|
|
|
|
|
return;
|
|
|
|
|
while (true) {
|
|
|
|
|
switch (state_) {
|
|
|
|
|
case State::PullInput: {
|
|
|
|
|
if (!input_cursor_->PullMultiple(*own_multi_frame_, context)) {
|
|
|
|
|
state_ = State::Exhausted;
|
|
|
|
|
return populated_any;
|
|
|
|
|
}
|
|
|
|
|
own_frames_consumer_.emplace(own_multi_frame_->GetValidFramesConsumer());
|
|
|
|
|
own_frames_it_ = own_frames_consumer_->begin();
|
|
|
|
|
state_ = State::FetchVertices;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
case State::FetchVertices: {
|
|
|
|
|
if (own_frames_it_ == own_frames_consumer_->end()) {
|
|
|
|
|
state_ = State::PullInput;
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
if (!filter_expressions_->empty() || property_expression_pair_.has_value() || current_batch_.empty()) {
|
|
|
|
|
MakeRequest(context);
|
|
|
|
|
} else {
|
|
|
|
|
// We can reuse the vertices as they don't depend on any value from the frames
|
|
|
|
|
current_vertex_it_ = current_batch_.begin();
|
|
|
|
|
}
|
|
|
|
|
state_ = State::PopulateOutput;
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
case State::PopulateOutput: {
|
|
|
|
|
if (!output_multi_frame.HasInvalidFrame()) {
|
|
|
|
|
return populated_any;
|
|
|
|
|
}
|
|
|
|
|
if (current_vertex_it_ == current_batch_.end()) {
|
|
|
|
|
own_frames_it_->MakeInvalid();
|
|
|
|
|
++own_frames_it_;
|
|
|
|
|
state_ = State::FetchVertices;
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (auto output_frame_it = output_frames_populator.begin();
|
|
|
|
|
output_frame_it != output_frames_populator.end() && current_vertex_it_ != current_batch_.end();
|
|
|
|
|
++output_frame_it) {
|
|
|
|
|
auto &output_frame = *output_frame_it;
|
|
|
|
|
output_frame = *own_frames_it_;
|
|
|
|
|
output_frame[output_symbol_] = TypedValue(*current_vertex_it_);
|
|
|
|
|
current_vertex_it_++;
|
|
|
|
|
populated_any = true;
|
|
|
|
|
}
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
case State::Exhausted: {
|
|
|
|
|
return populated_any;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return populated_any;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
void Shutdown() override { input_cursor_->Shutdown(); }
|
|
|
|
@ -560,6 +574,9 @@ class DistributedScanAllAndFilterCursor : public Cursor {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private:
|
|
|
|
|
enum class State { PullInput, FetchVertices, PopulateOutput, Exhausted };
|
|
|
|
|
|
|
|
|
|
State state_{State::PullInput};
|
|
|
|
|
const Symbol output_symbol_;
|
|
|
|
|
const UniqueCursorPtr input_cursor_;
|
|
|
|
|
const char *op_name_;
|
|
|
|
@ -638,10 +655,6 @@ class DistributedScanByPrimaryKeyCursor : public Cursor {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void PullMultiple(MultiFrame & /*input_multi_frame*/, ExecutionContext & /*context*/) override {
|
|
|
|
|
throw utils::NotYetImplemented("Multiframe version of ScanByPrimaryKey is yet to be implemented.");
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
void Reset() override { input_cursor_->Reset(); }
|
|
|
|
|
|
|
|
|
|
void Shutdown() override { input_cursor_->Shutdown(); }
|
|
|
|
@ -906,6 +919,27 @@ bool Filter::FilterCursor::Pull(Frame &frame, ExecutionContext &context) {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool Filter::FilterCursor::PullMultiple(MultiFrame &multi_frame, ExecutionContext &context) {
|
|
|
|
|
SCOPED_PROFILE_OP("Filter");
|
|
|
|
|
auto populated_any = false;
|
|
|
|
|
|
|
|
|
|
while (multi_frame.HasInvalidFrame()) {
|
|
|
|
|
if (!input_cursor_->PullMultiple(multi_frame, context)) {
|
|
|
|
|
return populated_any;
|
|
|
|
|
}
|
|
|
|
|
for (auto &frame : multi_frame.GetValidFramesConsumer()) {
|
|
|
|
|
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router,
|
|
|
|
|
storage::v3::View::OLD);
|
|
|
|
|
if (!EvaluateFilter(evaluator, self_.expression_)) {
|
|
|
|
|
frame.MakeInvalid();
|
|
|
|
|
} else {
|
|
|
|
|
populated_any = true;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return populated_any;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void Filter::FilterCursor::Shutdown() { input_cursor_->Shutdown(); }
|
|
|
|
|
|
|
|
|
|
void Filter::FilterCursor::Reset() { input_cursor_->Reset(); }
|
|
|
|
@ -941,19 +975,22 @@ bool Produce::ProduceCursor::Pull(Frame &frame, ExecutionContext &context) {
|
|
|
|
|
// Produce should always yield the latest results.
|
|
|
|
|
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router,
|
|
|
|
|
storage::v3::View::NEW);
|
|
|
|
|
for (auto named_expr : self_.named_expressions_) named_expr->Accept(evaluator);
|
|
|
|
|
for (auto *named_expr : self_.named_expressions_) named_expr->Accept(evaluator);
|
|
|
|
|
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void Produce::ProduceCursor::PullMultiple(MultiFrame &multi_frame, ExecutionContext &context) {
|
|
|
|
|
bool Produce::ProduceCursor::PullMultiple(MultiFrame &multi_frame, ExecutionContext &context) {
|
|
|
|
|
SCOPED_PROFILE_OP("ProduceMF");
|
|
|
|
|
|
|
|
|
|
input_cursor_->PullMultiple(multi_frame, context);
|
|
|
|
|
if (!input_cursor_->PullMultiple(multi_frame, context)) {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
auto iterator_for_valid_frame_only = multi_frame.GetValidFramesModifier();
|
|
|
|
|
|
|
|
|
|
for (auto &frame : iterator_for_valid_frame_only) {
|
|
|
|
|
// Produce should always yield the latest results.
|
|
|
|
|
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router,
|
|
|
|
@ -963,7 +1000,9 @@ void Produce::ProduceCursor::PullMultiple(MultiFrame &multi_frame, ExecutionCont
|
|
|
|
|
named_expr->Accept(evaluator);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void Produce::ProduceCursor::Shutdown() { input_cursor_->Shutdown(); }
|
|
|
|
|
|
|
|
|
@ -988,6 +1027,8 @@ Delete::DeleteCursor::DeleteCursor(const Delete &self, utils::MemoryResource *me
|
|
|
|
|
|
|
|
|
|
bool Delete::DeleteCursor::Pull(Frame & /*frame*/, ExecutionContext & /*context*/) { return false; }
|
|
|
|
|
|
|
|
|
|
bool Delete::DeleteCursor::PullMultiple(MultiFrame & /*multi_frame*/, ExecutionContext & /*context*/) { return false; }
|
|
|
|
|
|
|
|
|
|
void Delete::DeleteCursor::Shutdown() { input_cursor_->Shutdown(); }
|
|
|
|
|
|
|
|
|
|
void Delete::DeleteCursor::Reset() { input_cursor_->Reset(); }
|
|
|
|
@ -2566,11 +2607,10 @@ class DistributedCreateExpandCursor : public Cursor {
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void PullMultiple(MultiFrame &multi_frame, ExecutionContext &context) override {
|
|
|
|
|
bool PullMultiple(MultiFrame &multi_frame, ExecutionContext &context) override {
|
|
|
|
|
SCOPED_PROFILE_OP("CreateExpandMF");
|
|
|
|
|
input_cursor_->PullMultiple(multi_frame, context);
|
|
|
|
|
if (!multi_frame.HasValidFrame()) {
|
|
|
|
|
return;
|
|
|
|
|
if (!input_cursor_->PullMultiple(multi_frame, context)) {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
auto request_vertices = ExpandCreationInfoToRequests(multi_frame, context);
|
|
|
|
|
{
|
|
|
|
@ -2583,6 +2623,7 @@ class DistributedCreateExpandCursor : public Cursor {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void Shutdown() override { input_cursor_->Shutdown(); }
|
|
|
|
@ -2879,65 +2920,53 @@ class DistributedExpandCursor : public Cursor {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void InitEdgesMultiple(ExecutionContext &context) {
|
|
|
|
|
TypedValue &vertex_value = (*own_frames_it_)[self_.input_symbol_];
|
|
|
|
|
void InitEdgesMultiple() {
|
|
|
|
|
// This function won't work if any vertex id is duplicated in the input, because:
|
|
|
|
|
// 1. vertex_id_to_result_row is not a multimap
|
|
|
|
|
// 2. if self_.common_.existing_node is true, then we erase edges that might be necessary for the input vertex on a
|
|
|
|
|
// later frame
|
|
|
|
|
const auto &frame = (*own_frames_it_);
|
|
|
|
|
const auto &vertex_value = frame[self_.input_symbol_];
|
|
|
|
|
|
|
|
|
|
if (vertex_value.IsNull()) {
|
|
|
|
|
ResetMultiFrameEdgeIts();
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ExpectType(self_.input_symbol_, vertex_value, TypedValue::Type::Vertex);
|
|
|
|
|
auto &vertex = vertex_value.ValueVertex();
|
|
|
|
|
const auto &vertex = vertex_value.ValueVertex();
|
|
|
|
|
|
|
|
|
|
const auto convert_edges = [&vertex, &context](
|
|
|
|
|
std::vector<msgs::ExpandOneResultRow::EdgeWithSpecificProperties> &&edge_messages,
|
|
|
|
|
const EdgeAtom::Direction direction) {
|
|
|
|
|
std::vector<EdgeAccessor> edge_accessors;
|
|
|
|
|
edge_accessors.reserve(edge_messages.size());
|
|
|
|
|
current_vertex_ = &vertex;
|
|
|
|
|
|
|
|
|
|
switch (direction) {
|
|
|
|
|
case EdgeAtom::Direction::IN: {
|
|
|
|
|
for (auto &edge : edge_messages) {
|
|
|
|
|
edge_accessors.emplace_back(msgs::Edge{std::move(edge.other_end), vertex.Id(), {}, {edge.gid}, edge.type},
|
|
|
|
|
context.request_router);
|
|
|
|
|
}
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
case EdgeAtom::Direction::OUT: {
|
|
|
|
|
for (auto &edge : edge_messages) {
|
|
|
|
|
edge_accessors.emplace_back(msgs::Edge{vertex.Id(), std::move(edge.other_end), {}, {edge.gid}, edge.type},
|
|
|
|
|
context.request_router);
|
|
|
|
|
}
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
case EdgeAtom::Direction::BOTH: {
|
|
|
|
|
LOG_FATAL("Must indicate exact expansion direction here");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return edge_accessors;
|
|
|
|
|
};
|
|
|
|
|
auto &ref_counted_result_row = vertex_id_to_result_row.at(vertex.Id());
|
|
|
|
|
auto &result_row = *ref_counted_result_row.result_row;
|
|
|
|
|
|
|
|
|
|
auto *result_row = vertex_id_to_result_row[vertex.Id()];
|
|
|
|
|
current_in_edges_.clear();
|
|
|
|
|
current_in_edges_ =
|
|
|
|
|
convert_edges(std::move(result_row->in_edges_with_specific_properties), EdgeAtom::Direction::IN);
|
|
|
|
|
current_in_edge_it_ = current_in_edges_.begin();
|
|
|
|
|
current_out_edges_ =
|
|
|
|
|
convert_edges(std::move(result_row->out_edges_with_specific_properties), EdgeAtom::Direction::OUT);
|
|
|
|
|
current_out_edge_it_ = current_out_edges_.begin();
|
|
|
|
|
vertex_id_to_result_row.erase(vertex.Id());
|
|
|
|
|
current_in_edge_mf_it_ = result_row.in_edges_with_specific_properties.begin();
|
|
|
|
|
in_edges_end_it_ = result_row.in_edges_with_specific_properties.end();
|
|
|
|
|
AdvanceUntilSuitableEdge(current_in_edge_mf_it_, in_edges_end_it_);
|
|
|
|
|
current_out_edge_mf_it_ = result_row.out_edges_with_specific_properties.begin();
|
|
|
|
|
out_edges_end_it_ = result_row.out_edges_with_specific_properties.end();
|
|
|
|
|
AdvanceUntilSuitableEdge(current_out_edge_mf_it_, out_edges_end_it_);
|
|
|
|
|
|
|
|
|
|
if (ref_counted_result_row.ref_count == 1) {
|
|
|
|
|
vertex_id_to_result_row.erase(vertex.Id());
|
|
|
|
|
} else {
|
|
|
|
|
ref_counted_result_row.ref_count--;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool PullInputFrames(ExecutionContext &context) {
|
|
|
|
|
input_cursor_->PullMultiple(*own_multi_frame_, context);
|
|
|
|
|
const auto pulled_any = input_cursor_->PullMultiple(*own_multi_frame_, context);
|
|
|
|
|
// These needs to be updated regardless of the result of the pull, otherwise the consumer and iterator might
|
|
|
|
|
// get corrupted because of the operations done on our MultiFrame.
|
|
|
|
|
own_frames_consumer_ = own_multi_frame_->GetValidFramesConsumer();
|
|
|
|
|
own_frames_it_ = own_frames_consumer_->begin();
|
|
|
|
|
if (!own_multi_frame_->HasValidFrame()) {
|
|
|
|
|
if (!pulled_any) {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
vertex_id_to_result_row.clear();
|
|
|
|
|
|
|
|
|
|
msgs::ExpandOneRequest request;
|
|
|
|
|
request.direction = DirectionToMsgsDirection(self_.common_.direction);
|
|
|
|
|
// to not fetch any properties of the edges
|
|
|
|
@ -2950,35 +2979,40 @@ class DistributedExpandCursor : public Cursor {
|
|
|
|
|
|
|
|
|
|
ExpectType(self_.input_symbol_, vertex_value, TypedValue::Type::Vertex);
|
|
|
|
|
const auto &vertex = vertex_value.ValueVertex();
|
|
|
|
|
request.src_vertices.push_back(vertex.Id());
|
|
|
|
|
auto [it, inserted] = vertex_id_to_result_row.try_emplace(vertex.Id(), RefCountedResultRow{1U, nullptr});
|
|
|
|
|
|
|
|
|
|
if (inserted) {
|
|
|
|
|
request.src_vertices.push_back(vertex.Id());
|
|
|
|
|
} else {
|
|
|
|
|
it->second.ref_count++;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
result_rows_ = std::invoke([&context, &request]() mutable {
|
|
|
|
|
SCOPED_REQUEST_WAIT_PROFILE;
|
|
|
|
|
return context.request_router->ExpandOne(std::move(request));
|
|
|
|
|
});
|
|
|
|
|
vertex_id_to_result_row.clear();
|
|
|
|
|
for (auto &row : result_rows_) {
|
|
|
|
|
vertex_id_to_result_row[row.src_vertex.id] = &row;
|
|
|
|
|
vertex_id_to_result_row[row.src_vertex.id].result_row = &row;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void PullMultiple(MultiFrame &output_multi_frame, ExecutionContext &context) override {
|
|
|
|
|
bool PullMultiple(MultiFrame &output_multi_frame, ExecutionContext &context) override {
|
|
|
|
|
SCOPED_PROFILE_OP("DistributedExpandMF");
|
|
|
|
|
MG_ASSERT(!self_.common_.existing_node);
|
|
|
|
|
EnsureOwnMultiFrameIsGood(output_multi_frame);
|
|
|
|
|
// A helper function for expanding a node from an edge.
|
|
|
|
|
|
|
|
|
|
auto output_frames_populator = output_multi_frame.GetInvalidFramesPopulator();
|
|
|
|
|
auto populated_any = false;
|
|
|
|
|
|
|
|
|
|
while (true) {
|
|
|
|
|
switch (state_) {
|
|
|
|
|
case State::PullInputAndEdges: {
|
|
|
|
|
if (!PullInputFrames(context)) {
|
|
|
|
|
state_ = State::Exhausted;
|
|
|
|
|
return;
|
|
|
|
|
return populated_any;
|
|
|
|
|
}
|
|
|
|
|
state_ = State::InitInOutEdgesIt;
|
|
|
|
|
break;
|
|
|
|
@ -2987,43 +3021,62 @@ class DistributedExpandCursor : public Cursor {
|
|
|
|
|
if (own_frames_it_ == own_frames_consumer_->end()) {
|
|
|
|
|
state_ = State::PullInputAndEdges;
|
|
|
|
|
} else {
|
|
|
|
|
InitEdges(*own_frames_it_, context);
|
|
|
|
|
InitEdgesMultiple();
|
|
|
|
|
state_ = State::PopulateOutput;
|
|
|
|
|
}
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
case State::PopulateOutput: {
|
|
|
|
|
if (!output_multi_frame.HasInvalidFrame()) {
|
|
|
|
|
return;
|
|
|
|
|
return populated_any;
|
|
|
|
|
}
|
|
|
|
|
if (current_in_edge_it_ == current_in_edges_.end() && current_out_edge_it_ == current_out_edges_.end()) {
|
|
|
|
|
if (current_in_edge_mf_it_ == in_edges_end_it_ && current_out_edge_mf_it_ == out_edges_end_it_) {
|
|
|
|
|
own_frames_it_->MakeInvalid();
|
|
|
|
|
++own_frames_it_;
|
|
|
|
|
state_ = State::InitInOutEdgesIt;
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
auto populate_edges = [this, &context, &output_frames_populator](
|
|
|
|
|
const EdgeAtom::Direction direction, std::vector<EdgeAccessor>::iterator ¤t,
|
|
|
|
|
const std::vector<EdgeAccessor>::iterator &end) {
|
|
|
|
|
auto populate_edges = [this, &context, &output_frames_populator, &populated_any](
|
|
|
|
|
const EdgeAtom::Direction direction, EdgesIterator ¤t,
|
|
|
|
|
const EdgesIterator &end) {
|
|
|
|
|
for (auto output_frame_it = output_frames_populator.begin();
|
|
|
|
|
output_frame_it != output_frames_populator.end() && current != end; ++output_frame_it) {
|
|
|
|
|
auto &edge = *current;
|
|
|
|
|
++current;
|
|
|
|
|
auto &output_frame = *output_frame_it;
|
|
|
|
|
output_frame = *own_frames_it_;
|
|
|
|
|
output_frame[self_.common_.edge_symbol] = edge;
|
|
|
|
|
switch (direction) {
|
|
|
|
|
case EdgeAtom::Direction::IN: {
|
|
|
|
|
output_frame[self_.common_.edge_symbol] =
|
|
|
|
|
EdgeAccessor{msgs::Edge{edge.other_end, current_vertex_->Id(), {}, {edge.gid}, edge.type},
|
|
|
|
|
context.request_router};
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
case EdgeAtom::Direction::OUT: {
|
|
|
|
|
output_frame[self_.common_.edge_symbol] =
|
|
|
|
|
EdgeAccessor{msgs::Edge{current_vertex_->Id(), edge.other_end, {}, {edge.gid}, edge.type},
|
|
|
|
|
context.request_router};
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
case EdgeAtom::Direction::BOTH: {
|
|
|
|
|
LOG_FATAL("Must indicate exact expansion direction here");
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
PullDstVertex(output_frame, context, direction);
|
|
|
|
|
++current;
|
|
|
|
|
AdvanceUntilSuitableEdge(current, end);
|
|
|
|
|
populated_any = true;
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
populate_edges(EdgeAtom::Direction::IN, current_in_edge_it_, current_in_edges_.end());
|
|
|
|
|
populate_edges(EdgeAtom::Direction::OUT, current_out_edge_it_, current_out_edges_.end());
|
|
|
|
|
populate_edges(EdgeAtom::Direction::IN, current_in_edge_mf_it_, in_edges_end_it_);
|
|
|
|
|
populate_edges(EdgeAtom::Direction::OUT, current_out_edge_mf_it_, out_edges_end_it_);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
case State::Exhausted: {
|
|
|
|
|
return;
|
|
|
|
|
return populated_any;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return populated_any;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void EnsureOwnMultiFrameIsGood(MultiFrame &output_multi_frame) {
|
|
|
|
@ -3046,28 +3099,70 @@ class DistributedExpandCursor : public Cursor {
|
|
|
|
|
own_frames_consumer_.reset();
|
|
|
|
|
own_multi_frame_->MakeAllFramesInvalid();
|
|
|
|
|
state_ = State::PullInputAndEdges;
|
|
|
|
|
|
|
|
|
|
current_in_edges_.clear();
|
|
|
|
|
current_out_edges_.clear();
|
|
|
|
|
current_in_edge_it_ = current_in_edges_.end();
|
|
|
|
|
current_out_edge_it_ = current_out_edges_.end();
|
|
|
|
|
|
|
|
|
|
ResetMultiFrameEdgeIts();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private:
|
|
|
|
|
enum class State { PullInputAndEdges, InitInOutEdgesIt, PopulateOutput, Exhausted };
|
|
|
|
|
|
|
|
|
|
struct RefCountedResultRow {
|
|
|
|
|
size_t ref_count{0U};
|
|
|
|
|
msgs::ExpandOneResultRow *result_row{nullptr};
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
using EdgeWithSpecificProperties = msgs::ExpandOneResultRow::EdgeWithSpecificProperties;
|
|
|
|
|
using EdgesVector = std::vector<EdgeWithSpecificProperties>;
|
|
|
|
|
using EdgesIterator = EdgesVector::iterator;
|
|
|
|
|
|
|
|
|
|
void ResetMultiFrameEdgeIts() {
|
|
|
|
|
in_edges_end_it_ = EdgesIterator{};
|
|
|
|
|
current_in_edge_mf_it_ = in_edges_end_it_;
|
|
|
|
|
out_edges_end_it_ = EdgesIterator{};
|
|
|
|
|
current_out_edge_mf_it_ = out_edges_end_it_;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void AdvanceUntilSuitableEdge(EdgesIterator ¤t, const EdgesIterator &end) {
|
|
|
|
|
if (!self_.common_.existing_node) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const auto &existing_node_value = (*own_frames_it_)[self_.common_.node_symbol];
|
|
|
|
|
if (existing_node_value.IsNull()) {
|
|
|
|
|
current = end;
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
const auto &existing_node = existing_node_value.ValueVertex();
|
|
|
|
|
current = std::find_if(current, end, [&existing_node](const EdgeWithSpecificProperties &edge) {
|
|
|
|
|
return edge.other_end == existing_node.Id();
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const Expand &self_;
|
|
|
|
|
const UniqueCursorPtr input_cursor_;
|
|
|
|
|
EdgesIterator current_in_edge_mf_it_;
|
|
|
|
|
EdgesIterator in_edges_end_it_;
|
|
|
|
|
EdgesIterator current_out_edge_mf_it_;
|
|
|
|
|
EdgesIterator out_edges_end_it_;
|
|
|
|
|
State state_{State::PullInputAndEdges};
|
|
|
|
|
std::optional<MultiFrame> own_multi_frame_;
|
|
|
|
|
std::optional<ValidFramesConsumer> own_frames_consumer_;
|
|
|
|
|
const VertexAccessor *current_vertex_{nullptr};
|
|
|
|
|
ValidFramesConsumer::Iterator own_frames_it_;
|
|
|
|
|
std::vector<msgs::ExpandOneResultRow> result_rows_;
|
|
|
|
|
// This won't work if any vertex id is duplicated in the input
|
|
|
|
|
std::unordered_map<msgs::VertexId, RefCountedResultRow> vertex_id_to_result_row;
|
|
|
|
|
|
|
|
|
|
// TODO(antaljanosbenjamin): Remove when single frame approach is removed
|
|
|
|
|
std::vector<EdgeAccessor> current_in_edges_;
|
|
|
|
|
std::vector<EdgeAccessor> current_out_edges_;
|
|
|
|
|
std::vector<EdgeAccessor>::iterator current_in_edge_it_;
|
|
|
|
|
std::vector<EdgeAccessor>::iterator current_out_edge_it_;
|
|
|
|
|
State state_{State::PullInputAndEdges};
|
|
|
|
|
std::optional<MultiFrame> own_multi_frame_;
|
|
|
|
|
std::optional<ValidFramesConsumer> own_frames_consumer_;
|
|
|
|
|
ValidFramesConsumer::Iterator own_frames_it_;
|
|
|
|
|
std::vector<msgs::ExpandOneResultRow> result_rows_;
|
|
|
|
|
// This won't work if any vertex id is duplicated in the input
|
|
|
|
|
std::unordered_map<msgs::VertexId, msgs::ExpandOneResultRow *> vertex_id_to_result_row;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
} // namespace memgraph::query::v2::plan
|
|
|
|
|