Add first, but buggy implementation
This commit is contained in:
parent
575361827e
commit
f39a937323
@ -1,4 +1,4 @@
|
||||
// Copyright 2022 Memgraph Ltd.
|
||||
// Copyright 2023 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
@ -34,6 +34,7 @@ class Frame {
|
||||
const TypedValue &at(const Symbol &symbol) const { return elems_.at(symbol.position()); }
|
||||
|
||||
auto &elems() { return elems_; }
|
||||
const auto &elems() const { return elems_; }
|
||||
|
||||
utils::MemoryResource *GetMemoryResource() const { return elems_.get_allocator().GetMemoryResource(); }
|
||||
|
||||
|
@ -522,12 +522,12 @@ class DistributedScanAllAndFilterCursor : public Cursor {
|
||||
return true;
|
||||
}
|
||||
|
||||
void PullMultiple(MultiFrame &input_multi_frame, ExecutionContext &context) override {
|
||||
void PullMultiple(MultiFrame &output_multi_frame, ExecutionContext &context) override {
|
||||
SCOPED_PROFILE_OP(op_name_);
|
||||
|
||||
if (!own_multi_frame_.has_value()) {
|
||||
own_multi_frame_.emplace(MultiFrame(input_multi_frame.GetFirstFrame().elems().size(), kNumberOfFramesInMultiframe,
|
||||
input_multi_frame.GetMemoryResource()));
|
||||
own_multi_frame_.emplace(MultiFrame(output_multi_frame.GetFirstFrame().elems().size(),
|
||||
kNumberOfFramesInMultiframe, output_multi_frame.GetMemoryResource()));
|
||||
|
||||
MakeRequest(context);
|
||||
PullNextFrames(context);
|
||||
@ -537,7 +537,7 @@ class DistributedScanAllAndFilterCursor : public Cursor {
|
||||
return;
|
||||
}
|
||||
|
||||
for (auto &frame : input_multi_frame.GetInvalidFramesPopulator()) {
|
||||
for (auto &frame : output_multi_frame.GetInvalidFramesPopulator()) {
|
||||
if (MustAbort(context)) {
|
||||
throw HintedAbortError();
|
||||
}
|
||||
@ -2629,7 +2629,7 @@ class DistributedCreateExpandCursor : public Cursor {
|
||||
|
||||
class DistributedExpandCursor : public Cursor {
|
||||
public:
|
||||
explicit DistributedExpandCursor(const Expand &self, utils::MemoryResource *mem)
|
||||
DistributedExpandCursor(const Expand &self, utils::MemoryResource *mem)
|
||||
: self_(self),
|
||||
input_cursor_(self.input_->MakeCursor(mem)),
|
||||
current_in_edge_it_(current_in_edges_.begin()),
|
||||
@ -2666,16 +2666,10 @@ class DistributedExpandCursor : public Cursor {
|
||||
throw std::runtime_error("EdgeDirection Both not implemented");
|
||||
}
|
||||
};
|
||||
msgs::ExpandOneRequest request;
|
||||
// to not fetch any properties of the edges
|
||||
request.edge_properties.emplace();
|
||||
request.src_vertices.push_back(get_dst_vertex(edge, direction));
|
||||
request.direction = (direction == EdgeAtom::Direction::IN) ? msgs::EdgeDirection::OUT : msgs::EdgeDirection::IN;
|
||||
auto result_rows = context.request_router->ExpandOne(std::move(request));
|
||||
MG_ASSERT(result_rows.size() == 1);
|
||||
auto &result_row = result_rows.front();
|
||||
frame[self_.common_.node_symbol] = accessors::VertexAccessor(
|
||||
msgs::Vertex{result_row.src_vertex}, result_row.src_vertex_properties, context.request_router);
|
||||
|
||||
frame[self_.common_.node_symbol] =
|
||||
accessors::VertexAccessor(msgs::Vertex{get_dst_vertex(edge, direction)},
|
||||
std::vector<std::pair<msgs::PropertyId, msgs::Value>>{}, context.request_router);
|
||||
}
|
||||
|
||||
bool InitEdges(Frame &frame, ExecutionContext &context) {
|
||||
@ -2784,6 +2778,149 @@ class DistributedExpandCursor : public Cursor {
|
||||
}
|
||||
}
|
||||
|
||||
void InitEdgesMultiple(ExecutionContext &context) {
|
||||
TypedValue &vertex_value = (*own_frames_it_)[self_.input_symbol_];
|
||||
|
||||
if (vertex_value.IsNull()) {
|
||||
return;
|
||||
}
|
||||
|
||||
ExpectType(self_.input_symbol_, vertex_value, TypedValue::Type::Vertex);
|
||||
auto &vertex = vertex_value.ValueVertex();
|
||||
|
||||
const auto convert_edges = [&vertex, &context](
|
||||
std::vector<msgs::ExpandOneResultRow::EdgeWithSpecificProperties> &&edge_messages,
|
||||
const EdgeAtom::Direction direction) {
|
||||
std::vector<EdgeAccessor> edge_accessors;
|
||||
edge_accessors.reserve(edge_messages.size());
|
||||
|
||||
switch (direction) {
|
||||
case EdgeAtom::Direction::IN: {
|
||||
for (auto &edge : edge_messages) {
|
||||
edge_accessors.emplace_back(msgs::Edge{std::move(edge.other_end), vertex.Id(), {}, {edge.gid}, edge.type},
|
||||
context.request_router);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case EdgeAtom::Direction::OUT: {
|
||||
for (auto &edge : edge_messages) {
|
||||
edge_accessors.emplace_back(msgs::Edge{vertex.Id(), std::move(edge.other_end), {}, {edge.gid}, edge.type},
|
||||
context.request_router);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case EdgeAtom::Direction::BOTH: {
|
||||
LOG_FATAL("Must indicate exact expansion direction here");
|
||||
}
|
||||
}
|
||||
return edge_accessors;
|
||||
};
|
||||
|
||||
auto *result_row = vertex_id_to_result_row[vertex.Id()];
|
||||
current_in_edges_.clear();
|
||||
current_in_edges_ =
|
||||
convert_edges(std::move(result_row->in_edges_with_specific_properties), EdgeAtom::Direction::IN);
|
||||
current_in_edge_it_ = current_in_edges_.begin();
|
||||
current_out_edges_ =
|
||||
convert_edges(std::move(result_row->out_edges_with_specific_properties), EdgeAtom::Direction::OUT);
|
||||
current_out_edge_it_ = current_out_edges_.begin();
|
||||
vertex_id_to_result_row.erase(vertex.Id());
|
||||
}
|
||||
|
||||
void PullEdgesFromStorage(ExecutionContext &context) {
|
||||
// Input Vertex could be null if it is created by a failed optional match. In
|
||||
// those cases we skip that input pull and continue with the next.
|
||||
|
||||
msgs::ExpandOneRequest request;
|
||||
request.direction = DirectionToMsgsDirection(self_.common_.direction);
|
||||
// to not fetch any properties of the edges
|
||||
request.edge_properties.emplace();
|
||||
for (const auto &frame : own_multi_frame_->GetValidFramesReader()) {
|
||||
const auto &vertex_value = frame[self_.input_symbol_];
|
||||
|
||||
// Null check due to possible failed optional match.
|
||||
MG_ASSERT(!vertex_value.IsNull());
|
||||
|
||||
ExpectType(self_.input_symbol_, vertex_value, TypedValue::Type::Vertex);
|
||||
auto &vertex = vertex_value.ValueVertex();
|
||||
request.src_vertices.push_back(vertex.Id());
|
||||
}
|
||||
|
||||
result_rows_ = std::invoke([&context, &request]() mutable {
|
||||
SCOPED_REQUEST_WAIT_PROFILE;
|
||||
return context.request_router->ExpandOne(std::move(request));
|
||||
});
|
||||
vertex_id_to_result_row.clear();
|
||||
for (auto &row : result_rows_) {
|
||||
vertex_id_to_result_row[row.src_vertex.id] = &row;
|
||||
}
|
||||
}
|
||||
|
||||
void PullMultiple(MultiFrame &output_multi_frame, ExecutionContext &context) override {
|
||||
SCOPED_PROFILE_OP("DistributedExpandMF");
|
||||
MG_ASSERT(!self_.common_.existing_node);
|
||||
EnsureOwnMultiFrameIsGood(output_multi_frame);
|
||||
// A helper function for expanding a node from an edge.
|
||||
|
||||
while (true) {
|
||||
if (MustAbort(context)) throw HintedAbortError();
|
||||
if (own_frames_it_ == own_frames_consumer_->end()) {
|
||||
input_cursor_->PullMultiple(*own_multi_frame_, context);
|
||||
own_frames_consumer_ = own_multi_frame_->GetValidFramesConsumer();
|
||||
own_frames_it_ = own_frames_consumer_->begin();
|
||||
if (!own_multi_frame_->HasValidFrame()) {
|
||||
break;
|
||||
}
|
||||
|
||||
PullEdgesFromStorage(context);
|
||||
InitEdgesMultiple(context);
|
||||
}
|
||||
|
||||
while (own_frames_it_ != own_frames_consumer_->end()) {
|
||||
if (current_in_edge_it_ == current_in_edges_.end() && current_out_edge_it_ == current_out_edges_.end()) {
|
||||
own_frames_it_->MakeInvalid();
|
||||
++own_frames_it_;
|
||||
|
||||
InitEdgesMultiple(context);
|
||||
}
|
||||
|
||||
auto &input_frame = *own_frames_it_;
|
||||
|
||||
auto output_frames_populator = output_multi_frame.GetInvalidFramesPopulator();
|
||||
|
||||
auto populate_edges = [this, &context, &output_frames_populator, &input_frame](
|
||||
std::vector<EdgeAccessor>::iterator ¤t,
|
||||
const std::vector<EdgeAccessor>::iterator &end) {
|
||||
for (auto output_frame_it = output_frames_populator.begin();
|
||||
output_frame_it != output_frames_populator.end() && current != end; ++output_frame_it) {
|
||||
auto &edge = *current;
|
||||
++current;
|
||||
auto &output_frame = *output_frame_it++;
|
||||
output_frame = input_frame;
|
||||
output_frame[self_.common_.edge_symbol] = edge;
|
||||
PullDstVertex(output_frame, context, EdgeAtom::Direction::IN);
|
||||
}
|
||||
};
|
||||
populate_edges(current_in_edge_it_, current_in_edges_.end());
|
||||
populate_edges(current_out_edge_it_, current_out_edges_.end());
|
||||
|
||||
if (output_frames_populator.begin() == output_frames_populator.end()) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void EnsureOwnMultiFrameIsGood(MultiFrame &output_multi_frame) {
|
||||
if (!own_multi_frame_.has_value()) {
|
||||
own_multi_frame_.emplace(MultiFrame(output_multi_frame.GetFirstFrame().elems().size(),
|
||||
kNumberOfFramesInMultiframe, output_multi_frame.GetMemoryResource()));
|
||||
own_frames_consumer_.emplace(own_multi_frame_->GetValidFramesConsumer());
|
||||
own_frames_it_ = own_frames_consumer_->begin();
|
||||
}
|
||||
MG_ASSERT(output_multi_frame.GetFirstFrame().elems().size() == own_multi_frame_->GetFirstFrame().elems().size());
|
||||
}
|
||||
|
||||
void Shutdown() override { input_cursor_->Shutdown(); }
|
||||
|
||||
void Reset() override {
|
||||
@ -2801,6 +2938,12 @@ class DistributedExpandCursor : public Cursor {
|
||||
std::vector<EdgeAccessor> current_out_edges_;
|
||||
std::vector<EdgeAccessor>::iterator current_in_edge_it_;
|
||||
std::vector<EdgeAccessor>::iterator current_out_edge_it_;
|
||||
std::optional<MultiFrame> own_multi_frame_;
|
||||
std::optional<ValidFramesConsumer> own_frames_consumer_;
|
||||
ValidFramesConsumer::Iterator own_frames_it_;
|
||||
std::vector<msgs::ExpandOneResultRow> result_rows_;
|
||||
// This won't work if any vertex id is duplicated in the input
|
||||
std::unordered_map<msgs::VertexId, msgs::ExpandOneResultRow *> vertex_id_to_result_row;
|
||||
};
|
||||
|
||||
} // namespace memgraph::query::v2::plan
|
||||
|
@ -25,6 +25,7 @@
|
||||
#include "storage/v3/id_types.hpp"
|
||||
#include "storage/v3/property_value.hpp"
|
||||
#include "storage/v3/result.hpp"
|
||||
#include "utils/fnv.hpp"
|
||||
|
||||
namespace memgraph::msgs {
|
||||
|
||||
@ -579,3 +580,48 @@ using WriteResponses = std::variant<CreateVerticesResponse, DeleteVerticesRespon
|
||||
CreateExpandResponse, DeleteEdgesResponse, UpdateEdgesResponse, CommitResponse>;
|
||||
|
||||
} // namespace memgraph::msgs
|
||||
|
||||
namespace std {
|
||||
|
||||
template <>
|
||||
struct hash<memgraph::msgs::Value>;
|
||||
|
||||
template <>
|
||||
struct hash<memgraph::msgs::VertexId> {
|
||||
size_t operator()(const memgraph::msgs::VertexId &id) const {
|
||||
using LabelId = memgraph::storage::v3::LabelId;
|
||||
using Value = memgraph::msgs::Value;
|
||||
return memgraph::utils::HashCombine<LabelId, std::vector<Value>, std::hash<LabelId>,
|
||||
memgraph::utils::FnvCollection<std::vector<Value>, Value>>{}(id.first.id,
|
||||
id.second);
|
||||
}
|
||||
};
|
||||
|
||||
template <>
|
||||
struct hash<memgraph::msgs::Value> {
|
||||
size_t operator()(const memgraph::msgs::Value &value) const {
|
||||
using Type = memgraph::msgs::Value::Type;
|
||||
switch (value.type) {
|
||||
case Type::Null:
|
||||
return std::hash<size_t>{}(0U);
|
||||
case Type::Bool:
|
||||
return std::hash<bool>{}(value.bool_v);
|
||||
case Type::Int64:
|
||||
return std::hash<int64_t>{}(value.int_v);
|
||||
case Type::Double:
|
||||
return std::hash<double>{}(value.double_v);
|
||||
case Type::String:
|
||||
return std::hash<std::string>{}(value.string_v);
|
||||
case Type::List:
|
||||
LOG_FATAL("Add hash for lists");
|
||||
case Type::Map:
|
||||
LOG_FATAL("Add hash for maps");
|
||||
case Type::Vertex:
|
||||
LOG_FATAL("Add hash for vertices");
|
||||
case Type::Edge:
|
||||
LOG_FATAL("Add hash for edges");
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace std
|
||||
|
Loading…
Reference in New Issue
Block a user