From f39a937323ccdec367eb37373cc15b6063b7539b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A1nos=20Benjamin=20Antal?= Date: Wed, 18 Jan 2023 13:31:35 +0100 Subject: [PATCH] Add first, but buggy implementation --- src/expr/interpret/frame.hpp | 3 +- src/query/v2/plan/operator.cpp | 173 ++++++++++++++++++++++++++++++--- src/query/v2/requests.hpp | 46 +++++++++ 3 files changed, 206 insertions(+), 16 deletions(-) diff --git a/src/expr/interpret/frame.hpp b/src/expr/interpret/frame.hpp index 1cd6a99ce..9f4068226 100644 --- a/src/expr/interpret/frame.hpp +++ b/src/expr/interpret/frame.hpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -34,6 +34,7 @@ class Frame { const TypedValue &at(const Symbol &symbol) const { return elems_.at(symbol.position()); } auto &elems() { return elems_; } + const auto &elems() const { return elems_; } utils::MemoryResource *GetMemoryResource() const { return elems_.get_allocator().GetMemoryResource(); } diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp index 9841791e6..900ec15a0 100644 --- a/src/query/v2/plan/operator.cpp +++ b/src/query/v2/plan/operator.cpp @@ -522,12 +522,12 @@ class DistributedScanAllAndFilterCursor : public Cursor { return true; } - void PullMultiple(MultiFrame &input_multi_frame, ExecutionContext &context) override { + void PullMultiple(MultiFrame &output_multi_frame, ExecutionContext &context) override { SCOPED_PROFILE_OP(op_name_); if (!own_multi_frame_.has_value()) { - own_multi_frame_.emplace(MultiFrame(input_multi_frame.GetFirstFrame().elems().size(), kNumberOfFramesInMultiframe, - input_multi_frame.GetMemoryResource())); + own_multi_frame_.emplace(MultiFrame(output_multi_frame.GetFirstFrame().elems().size(), + kNumberOfFramesInMultiframe, output_multi_frame.GetMemoryResource())); MakeRequest(context); PullNextFrames(context); @@ -537,7 +537,7 @@ class DistributedScanAllAndFilterCursor : public Cursor { return; } - for (auto &frame : input_multi_frame.GetInvalidFramesPopulator()) { + for (auto &frame : output_multi_frame.GetInvalidFramesPopulator()) { if (MustAbort(context)) { throw HintedAbortError(); } @@ -2629,7 +2629,7 @@ class DistributedCreateExpandCursor : public Cursor { class DistributedExpandCursor : public Cursor { public: - explicit DistributedExpandCursor(const Expand &self, utils::MemoryResource *mem) + DistributedExpandCursor(const Expand &self, utils::MemoryResource *mem) : self_(self), input_cursor_(self.input_->MakeCursor(mem)), current_in_edge_it_(current_in_edges_.begin()), @@ -2666,16 +2666,10 @@ class DistributedExpandCursor : public Cursor { throw std::runtime_error("EdgeDirection Both not implemented"); } }; - msgs::ExpandOneRequest request; - // to not fetch any properties of the edges - request.edge_properties.emplace(); - request.src_vertices.push_back(get_dst_vertex(edge, direction)); - request.direction = (direction == EdgeAtom::Direction::IN) ? msgs::EdgeDirection::OUT : msgs::EdgeDirection::IN; - auto result_rows = context.request_router->ExpandOne(std::move(request)); - MG_ASSERT(result_rows.size() == 1); - auto &result_row = result_rows.front(); - frame[self_.common_.node_symbol] = accessors::VertexAccessor( - msgs::Vertex{result_row.src_vertex}, result_row.src_vertex_properties, context.request_router); + + frame[self_.common_.node_symbol] = + accessors::VertexAccessor(msgs::Vertex{get_dst_vertex(edge, direction)}, + std::vector>{}, context.request_router); } bool InitEdges(Frame &frame, ExecutionContext &context) { @@ -2784,6 +2778,149 @@ class DistributedExpandCursor : public Cursor { } } + void InitEdgesMultiple(ExecutionContext &context) { + TypedValue &vertex_value = (*own_frames_it_)[self_.input_symbol_]; + + if (vertex_value.IsNull()) { + return; + } + + ExpectType(self_.input_symbol_, vertex_value, TypedValue::Type::Vertex); + auto &vertex = vertex_value.ValueVertex(); + + const auto convert_edges = [&vertex, &context]( + std::vector &&edge_messages, + const EdgeAtom::Direction direction) { + std::vector edge_accessors; + edge_accessors.reserve(edge_messages.size()); + + switch (direction) { + case EdgeAtom::Direction::IN: { + for (auto &edge : edge_messages) { + edge_accessors.emplace_back(msgs::Edge{std::move(edge.other_end), vertex.Id(), {}, {edge.gid}, edge.type}, + context.request_router); + } + break; + } + case EdgeAtom::Direction::OUT: { + for (auto &edge : edge_messages) { + edge_accessors.emplace_back(msgs::Edge{vertex.Id(), std::move(edge.other_end), {}, {edge.gid}, edge.type}, + context.request_router); + } + break; + } + case EdgeAtom::Direction::BOTH: { + LOG_FATAL("Must indicate exact expansion direction here"); + } + } + return edge_accessors; + }; + + auto *result_row = vertex_id_to_result_row[vertex.Id()]; + current_in_edges_.clear(); + current_in_edges_ = + convert_edges(std::move(result_row->in_edges_with_specific_properties), EdgeAtom::Direction::IN); + current_in_edge_it_ = current_in_edges_.begin(); + current_out_edges_ = + convert_edges(std::move(result_row->out_edges_with_specific_properties), EdgeAtom::Direction::OUT); + current_out_edge_it_ = current_out_edges_.begin(); + vertex_id_to_result_row.erase(vertex.Id()); + } + + void PullEdgesFromStorage(ExecutionContext &context) { + // Input Vertex could be null if it is created by a failed optional match. In + // those cases we skip that input pull and continue with the next. + + msgs::ExpandOneRequest request; + request.direction = DirectionToMsgsDirection(self_.common_.direction); + // to not fetch any properties of the edges + request.edge_properties.emplace(); + for (const auto &frame : own_multi_frame_->GetValidFramesReader()) { + const auto &vertex_value = frame[self_.input_symbol_]; + + // Null check due to possible failed optional match. + MG_ASSERT(!vertex_value.IsNull()); + + ExpectType(self_.input_symbol_, vertex_value, TypedValue::Type::Vertex); + auto &vertex = vertex_value.ValueVertex(); + request.src_vertices.push_back(vertex.Id()); + } + + result_rows_ = std::invoke([&context, &request]() mutable { + SCOPED_REQUEST_WAIT_PROFILE; + return context.request_router->ExpandOne(std::move(request)); + }); + vertex_id_to_result_row.clear(); + for (auto &row : result_rows_) { + vertex_id_to_result_row[row.src_vertex.id] = &row; + } + } + + void PullMultiple(MultiFrame &output_multi_frame, ExecutionContext &context) override { + SCOPED_PROFILE_OP("DistributedExpandMF"); + MG_ASSERT(!self_.common_.existing_node); + EnsureOwnMultiFrameIsGood(output_multi_frame); + // A helper function for expanding a node from an edge. + + while (true) { + if (MustAbort(context)) throw HintedAbortError(); + if (own_frames_it_ == own_frames_consumer_->end()) { + input_cursor_->PullMultiple(*own_multi_frame_, context); + own_frames_consumer_ = own_multi_frame_->GetValidFramesConsumer(); + own_frames_it_ = own_frames_consumer_->begin(); + if (!own_multi_frame_->HasValidFrame()) { + break; + } + + PullEdgesFromStorage(context); + InitEdgesMultiple(context); + } + + while (own_frames_it_ != own_frames_consumer_->end()) { + if (current_in_edge_it_ == current_in_edges_.end() && current_out_edge_it_ == current_out_edges_.end()) { + own_frames_it_->MakeInvalid(); + ++own_frames_it_; + + InitEdgesMultiple(context); + } + + auto &input_frame = *own_frames_it_; + + auto output_frames_populator = output_multi_frame.GetInvalidFramesPopulator(); + + auto populate_edges = [this, &context, &output_frames_populator, &input_frame]( + std::vector::iterator ¤t, + const std::vector::iterator &end) { + for (auto output_frame_it = output_frames_populator.begin(); + output_frame_it != output_frames_populator.end() && current != end; ++output_frame_it) { + auto &edge = *current; + ++current; + auto &output_frame = *output_frame_it++; + output_frame = input_frame; + output_frame[self_.common_.edge_symbol] = edge; + PullDstVertex(output_frame, context, EdgeAtom::Direction::IN); + } + }; + populate_edges(current_in_edge_it_, current_in_edges_.end()); + populate_edges(current_out_edge_it_, current_out_edges_.end()); + + if (output_frames_populator.begin() == output_frames_populator.end()) { + return; + } + } + } + } + + void EnsureOwnMultiFrameIsGood(MultiFrame &output_multi_frame) { + if (!own_multi_frame_.has_value()) { + own_multi_frame_.emplace(MultiFrame(output_multi_frame.GetFirstFrame().elems().size(), + kNumberOfFramesInMultiframe, output_multi_frame.GetMemoryResource())); + own_frames_consumer_.emplace(own_multi_frame_->GetValidFramesConsumer()); + own_frames_it_ = own_frames_consumer_->begin(); + } + MG_ASSERT(output_multi_frame.GetFirstFrame().elems().size() == own_multi_frame_->GetFirstFrame().elems().size()); + } + void Shutdown() override { input_cursor_->Shutdown(); } void Reset() override { @@ -2801,6 +2938,12 @@ class DistributedExpandCursor : public Cursor { std::vector current_out_edges_; std::vector::iterator current_in_edge_it_; std::vector::iterator current_out_edge_it_; + std::optional own_multi_frame_; + std::optional own_frames_consumer_; + ValidFramesConsumer::Iterator own_frames_it_; + std::vector result_rows_; + // This won't work if any vertex id is duplicated in the input + std::unordered_map vertex_id_to_result_row; }; } // namespace memgraph::query::v2::plan diff --git a/src/query/v2/requests.hpp b/src/query/v2/requests.hpp index 2335fea7d..b2d7f9123 100644 --- a/src/query/v2/requests.hpp +++ b/src/query/v2/requests.hpp @@ -25,6 +25,7 @@ #include "storage/v3/id_types.hpp" #include "storage/v3/property_value.hpp" #include "storage/v3/result.hpp" +#include "utils/fnv.hpp" namespace memgraph::msgs { @@ -579,3 +580,48 @@ using WriteResponses = std::variant; } // namespace memgraph::msgs + +namespace std { + +template <> +struct hash; + +template <> +struct hash { + size_t operator()(const memgraph::msgs::VertexId &id) const { + using LabelId = memgraph::storage::v3::LabelId; + using Value = memgraph::msgs::Value; + return memgraph::utils::HashCombine, std::hash, + memgraph::utils::FnvCollection, Value>>{}(id.first.id, + id.second); + } +}; + +template <> +struct hash { + size_t operator()(const memgraph::msgs::Value &value) const { + using Type = memgraph::msgs::Value::Type; + switch (value.type) { + case Type::Null: + return std::hash{}(0U); + case Type::Bool: + return std::hash{}(value.bool_v); + case Type::Int64: + return std::hash{}(value.int_v); + case Type::Double: + return std::hash{}(value.double_v); + case Type::String: + return std::hash{}(value.string_v); + case Type::List: + LOG_FATAL("Add hash for lists"); + case Type::Map: + LOG_FATAL("Add hash for maps"); + case Type::Vertex: + LOG_FATAL("Add hash for vertices"); + case Type::Edge: + LOG_FATAL("Add hash for edges"); + } + } +}; + +} // namespace std