From f89a2bbf428563b48db7454bdfe49d5e8fdcb137 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A1nos=20Benjamin=20Antal?= <antaljanosbenjamin@users.noreply.github.com> Date: Thu, 20 Oct 2022 11:35:00 +0200 Subject: [PATCH] Make ExpandOne work in the query engine (#589) --- src/coordinator/shard_map.hpp | 16 +- src/glue/v2/communication.cpp | 1 + src/memgraph.cpp | 4 +- src/query/v2/accessors.cpp | 24 +- src/query/v2/accessors.hpp | 42 +- src/query/v2/context.hpp | 2 +- src/query/v2/conversions.hpp | 2 +- src/query/v2/interpreter.cpp | 1 + src/query/v2/plan/operator.cpp | 160 ++++++- src/query/v2/plan/vertex_count_cache.hpp | 2 +- src/query/v2/requests.hpp | 79 ++- src/query/v2/shard_request_manager.hpp | 61 +-- src/storage/v3/bindings/db_accessor.hpp | 2 +- src/storage/v3/shard.hpp | 4 +- src/storage/v3/shard_rsm.cpp | 449 ++++++++---------- src/storage/v3/value_conversions.hpp | 28 +- src/storage/v3/vertex_accessor.cpp | 1 + .../distributed_queries.py | 29 +- tests/simulation/common.hpp | 84 ++-- tests/simulation/shard_request_manager.cpp | 375 ++++++++------- tests/simulation/shard_rsm.cpp | 171 +++---- tests/unit/machine_manager.cpp | 44 +- 22 files changed, 884 insertions(+), 697 deletions(-) diff --git a/src/coordinator/shard_map.hpp b/src/coordinator/shard_map.hpp index 93ddd78c6..78a80e9e7 100644 --- a/src/coordinator/shard_map.hpp +++ b/src/coordinator/shard_map.hpp @@ -208,7 +208,13 @@ struct ShardMap { // Find a random place for the server to plug in } - LabelId GetLabelId(const std::string &label) const { return labels.at(label); } + std::optional<LabelId> GetLabelId(const std::string &label) const { + if (const auto it = labels.find(label); it != labels.end()) { + return it->second; + } + + return std::nullopt; + } std::string GetLabelName(const LabelId label) const { if (const auto it = @@ -220,8 +226,8 @@ struct ShardMap { } std::optional<PropertyId> GetPropertyId(const std::string &property_name) const { - if (properties.contains(property_name)) { - return properties.at(property_name); + if (const auto it = properties.find(property_name); it != properties.end()) { + return it->second; } return std::nullopt; @@ -237,8 +243,8 @@ struct ShardMap { } std::optional<EdgeTypeId> GetEdgeTypeId(const std::string &edge_type) const { - if (edge_types.contains(edge_type)) { - return edge_types.at(edge_type); + if (const auto it = edge_types.find(edge_type); it != edge_types.end()) { + return it->second; } return std::nullopt; diff --git a/src/glue/v2/communication.cpp b/src/glue/v2/communication.cpp index f14013ae0..ffafc4a59 100644 --- a/src/glue/v2/communication.cpp +++ b/src/glue/v2/communication.cpp @@ -205,6 +205,7 @@ Value ToBoltValue(msgs::Value value) { case msgs::Value::Type::Edge: { throw utils::BasicException("Vertex and Edge not supported!"); } + // TODO Value to Date types not supported } } diff --git a/src/memgraph.cpp b/src/memgraph.cpp index 36a194fb4..9c4d71e94 100644 --- a/src/memgraph.cpp +++ b/src/memgraph.cpp @@ -643,10 +643,10 @@ int main(int argc, char **argv) { const std::string label{"label"}; memgraph::coordinator::ShardMap sm; auto prop_map = sm.AllocatePropertyIds(std::vector<std::string>{property}); - auto edge_type_map = sm.AllocateEdgeTypeIds(std::vector<std::string>{"edge_type"}); + auto edge_type_map = sm.AllocateEdgeTypeIds(std::vector<std::string>{"TO"}); std::vector<memgraph::storage::v3::SchemaProperty> schema{{prop_map.at(property), memgraph::common::SchemaType::INT}}; sm.InitializeNewLabel(label, schema, 1, sm.shard_map_version); - sm.SplitShard(sm.GetHlc(), sm.GetLabelId(label), + sm.SplitShard(sm.GetHlc(), *sm.GetLabelId(label), std::vector<memgraph::storage::v3::PropertyValue>{memgraph::storage::v3::PropertyValue{2}}); memgraph::coordinator::Coordinator coordinator{sm}; diff --git a/src/query/v2/accessors.cpp b/src/query/v2/accessors.cpp index de64e80fb..5391f1384 100644 --- a/src/query/v2/accessors.cpp +++ b/src/query/v2/accessors.cpp @@ -14,13 +14,12 @@ #include "storage/v3/id_types.hpp" namespace memgraph::query::v2::accessors { -EdgeAccessor::EdgeAccessor(Edge edge, std::vector<std::pair<PropertyId, Value>> props) - : edge(std::move(edge)), properties(std::move(props)) {} +EdgeAccessor::EdgeAccessor(Edge edge) : edge(std::move(edge)) {} -EdgeTypeId EdgeAccessor::EdgeType() const { return EdgeTypeId::FromUint(edge.type.id); } +EdgeTypeId EdgeAccessor::EdgeType() const { return edge.type.id; } -std::vector<std::pair<PropertyId, Value>> EdgeAccessor::Properties() const { - return properties; +const std::vector<std::pair<PropertyId, Value>> &EdgeAccessor::Properties() const { + return edge.properties; // std::map<std::string, TypedValue> res; // for (const auto &[name, value] : *properties) { // res[name] = ValueToTypedValue(value); @@ -34,7 +33,9 @@ Value EdgeAccessor::GetProperty(const std::string & /*prop_name*/) const { return {}; } -Edge EdgeAccessor::GetEdge() const { return edge; } +const Edge &EdgeAccessor::GetEdge() const { return edge; } + +bool EdgeAccessor::IsCycle() const { return edge.src == edge.dst; }; VertexAccessor EdgeAccessor::To() const { return VertexAccessor(Vertex{edge.dst}, {}); } @@ -45,6 +46,8 @@ VertexAccessor::VertexAccessor(Vertex v, std::vector<std::pair<PropertyId, Value Label VertexAccessor::PrimaryLabel() const { return vertex.id.first; } +const msgs::VertexId &VertexAccessor::Id() const { return vertex.id; } + std::vector<Label> VertexAccessor::Labels() const { return vertex.labels; } bool VertexAccessor::HasLabel(Label &label) const { @@ -52,14 +55,7 @@ bool VertexAccessor::HasLabel(Label &label) const { [label](const auto &l) { return l.id == label.id; }) != vertex.labels.end(); } -std::vector<std::pair<PropertyId, Value>> VertexAccessor::Properties() const { - // std::map<std::string, TypedValue> res; - // for (const auto &[name, value] : *properties) { - // res[name] = ValueToTypedValue(value); - // } - // return res; - return properties; -} +const std::vector<std::pair<PropertyId, Value>> &VertexAccessor::Properties() const { return properties; } Value VertexAccessor::GetProperty(PropertyId prop_id) const { return std::find_if(properties.begin(), properties.end(), [&](auto &pr) { return prop_id == pr.first; })->second; diff --git a/src/query/v2/accessors.hpp b/src/query/v2/accessors.hpp index 4ea647539..eed169b8a 100644 --- a/src/query/v2/accessors.hpp +++ b/src/query/v2/accessors.hpp @@ -36,59 +36,61 @@ class VertexAccessor; class EdgeAccessor final { public: - EdgeAccessor(Edge edge, std::vector<std::pair<PropertyId, Value>> props); + explicit EdgeAccessor(Edge edge); - EdgeTypeId EdgeType() const; + [[nodiscard]] EdgeTypeId EdgeType() const; - std::vector<std::pair<PropertyId, Value>> Properties() const; + [[nodiscard]] const std::vector<std::pair<PropertyId, Value>> &Properties() const; - Value GetProperty(const std::string &prop_name) const; + [[nodiscard]] Value GetProperty(const std::string &prop_name) const; - Edge GetEdge() const; + [[nodiscard]] const Edge &GetEdge() const; + + [[nodiscard]] bool IsCycle() const; // Dummy function // NOLINTNEXTLINE(readability-convert-member-functions-to-static) - inline size_t CypherId() const { return 10; } + [[nodiscard]] size_t CypherId() const { return 10; } // bool HasSrcAccessor const { return src == nullptr; } // bool HasDstAccessor const { return dst == nullptr; } - VertexAccessor To() const; - VertexAccessor From() const; + [[nodiscard]] VertexAccessor To() const; + [[nodiscard]] VertexAccessor From() const; - friend bool operator==(const EdgeAccessor &lhs, const EdgeAccessor &rhs) { - return lhs.edge == rhs.edge && lhs.properties == rhs.properties; - } + friend bool operator==(const EdgeAccessor &lhs, const EdgeAccessor &rhs) { return lhs.edge == rhs.edge; } friend bool operator!=(const EdgeAccessor &lhs, const EdgeAccessor &rhs) { return !(lhs == rhs); } private: Edge edge; - std::vector<std::pair<PropertyId, Value>> properties; }; class VertexAccessor final { public: using PropertyId = msgs::PropertyId; using Label = msgs::Label; + using VertexId = msgs::VertexId; VertexAccessor(Vertex v, std::vector<std::pair<PropertyId, Value>> props); - Label PrimaryLabel() const; + [[nodiscard]] Label PrimaryLabel() const; - std::vector<Label> Labels() const; + [[nodiscard]] const msgs::VertexId &Id() const; - bool HasLabel(Label &label) const; + [[nodiscard]] std::vector<Label> Labels() const; - std::vector<std::pair<PropertyId, Value>> Properties() const; + [[nodiscard]] bool HasLabel(Label &label) const; - Value GetProperty(PropertyId prop_id) const; - Value GetProperty(const std::string &prop_name) const; + [[nodiscard]] const std::vector<std::pair<PropertyId, Value>> &Properties() const; - msgs::Vertex GetVertex() const; + [[nodiscard]] Value GetProperty(PropertyId prop_id) const; + [[nodiscard]] Value GetProperty(const std::string &prop_name) const; + + [[nodiscard]] msgs::Vertex GetVertex() const; // Dummy function // NOLINTNEXTLINE(readability-convert-member-functions-to-static) - inline size_t CypherId() const { return 10; } + [[nodiscard]] size_t CypherId() const { return 10; } // auto InEdges(storage::View view, const std::vector<storage::EdgeTypeId> &edge_types) const // -> storage::Result<decltype(iter::imap(MakeEdgeAccessor, *impl_.InEdges(view)))> { diff --git a/src/query/v2/context.hpp b/src/query/v2/context.hpp index e39ca6623..24b472438 100644 --- a/src/query/v2/context.hpp +++ b/src/query/v2/context.hpp @@ -80,7 +80,7 @@ inline std::vector<storage::v3::LabelId> NamesToLabels(const std::vector<std::st // TODO Fix by using reference if (shard_request_manager != nullptr) { for (const auto &name : label_names) { - labels.push_back(shard_request_manager->LabelNameToLabelId(name)); + labels.push_back(shard_request_manager->NameToLabel(name)); } } return labels; diff --git a/src/query/v2/conversions.hpp b/src/query/v2/conversions.hpp index 1f8446656..0c67c794b 100644 --- a/src/query/v2/conversions.hpp +++ b/src/query/v2/conversions.hpp @@ -49,7 +49,7 @@ inline TypedValue ValueToTypedValue(const msgs::Value &value) { case Value::Type::Vertex: return TypedValue(accessors::VertexAccessor(value.vertex_v, {})); case Value::Type::Edge: - return TypedValue(accessors::EdgeAccessor(value.edge_v, {})); + return TypedValue(accessors::EdgeAccessor(value.edge_v)); } throw std::runtime_error("Incorrect type in conversion"); } diff --git a/src/query/v2/interpreter.cpp b/src/query/v2/interpreter.cpp index 92dd8648c..c02497030 100644 --- a/src/query/v2/interpreter.cpp +++ b/src/query/v2/interpreter.cpp @@ -696,6 +696,7 @@ PullPlan::PullPlan(const std::shared_ptr<CachedPlan> plan, const Parameters &par ctx_.is_shutting_down = &interpreter_context->is_shutting_down; ctx_.is_profile_query = is_profile_query; ctx_.shard_request_manager = shard_request_manager; + ctx_.edge_ids_alloc = interpreter_context->edge_ids_alloc; } std::optional<plan::ProfilingStatsWithTotalTime> PullPlan::Pull(AnyStream *stream, std::optional<int> n, diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp index 38a884c43..3c36559c9 100644 --- a/src/query/v2/plan/operator.cpp +++ b/src/query/v2/plan/operator.cpp @@ -424,6 +424,8 @@ ScanAll::ScanAll(const std::shared_ptr<LogicalOperator> &input, Symbol output_sy ACCEPT_WITH_INPUT(ScanAll) +class DistributedScanAllCursor; + UniqueCursorPtr ScanAll::MakeCursor(utils::MemoryResource *mem) const { EventCounter::IncrementCounter(EventCounter::ScanAllOperator); @@ -562,10 +564,12 @@ Expand::Expand(const std::shared_ptr<LogicalOperator> &input, Symbol input_symbo ACCEPT_WITH_INPUT(Expand) +class DistributedExpandCursor; + UniqueCursorPtr Expand::MakeCursor(utils::MemoryResource *mem) const { EventCounter::IncrementCounter(EventCounter::ExpandOperator); - return MakeUniqueCursorPtr<ExpandCursor>(mem, *this, mem); + return MakeUniqueCursorPtr<DistributedExpandCursor>(mem, *this, mem); } std::vector<Symbol> Expand::ModifiedSymbols(const SymbolTable &table) const { @@ -2387,7 +2391,7 @@ class DistributedCreateExpandCursor : public Cursor { msgs::NewExpand request{.id = {context.edge_ids_alloc.AllocateId()}}; ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, nullptr, storage::v3::View::NEW); - request.type = {edge_info.edge_type.AsUint()}; + request.type = {edge_info.edge_type}; if (const auto *edge_info_properties = std::get_if<PropertiesMapList>(&edge_info.properties)) { for (const auto &[property, value_expression] : *edge_info_properties) { TypedValue val = value_expression->Accept(evaluator); @@ -2447,4 +2451,156 @@ class DistributedCreateExpandCursor : public Cursor { msgs::ExecutionState<msgs::CreateExpandRequest> state_; }; +class DistributedExpandCursor : public Cursor { + public: + explicit DistributedExpandCursor(const Expand &self, utils::MemoryResource *mem) + : self_(self), + input_cursor_(self.input_->MakeCursor(mem)), + current_in_edge_it_(current_in_edges_.begin()), + current_out_edge_it_(current_out_edges_.begin()) { + if (self_.common_.existing_node) { + throw QueryRuntimeException("Cannot use existing node with DistributedExpandOne cursor!"); + } + } + + using VertexAccessor = accessors::VertexAccessor; + using EdgeAccessor = accessors::EdgeAccessor; + + bool InitEdges(Frame &frame, ExecutionContext &context) { + // Input Vertex could be null if it is created by a failed optional match. In + // those cases we skip that input pull and continue with the next. + + while (true) { + if (!input_cursor_->Pull(frame, context)) return false; + TypedValue &vertex_value = frame[self_.input_symbol_]; + + // Null check due to possible failed optional match. + if (vertex_value.IsNull()) continue; + + ExpectType(self_.input_symbol_, vertex_value, TypedValue::Type::Vertex); + auto &vertex = vertex_value.ValueVertex(); + static constexpr auto direction_to_msgs_direction = [](const EdgeAtom::Direction direction) { + switch (direction) { + case EdgeAtom::Direction::IN: + return msgs::EdgeDirection::IN; + case EdgeAtom::Direction::OUT: + return msgs::EdgeDirection::OUT; + case EdgeAtom::Direction::BOTH: + return msgs::EdgeDirection::BOTH; + } + }; + + msgs::ExpandOneRequest request; + request.direction = direction_to_msgs_direction(self_.common_.direction); + // to not fetch any properties of the edges + request.edge_properties.emplace(); + request.src_vertices.push_back(vertex.Id()); + msgs::ExecutionState<msgs::ExpandOneRequest> request_state; + auto result_rows = context.shard_request_manager->Request(request_state, std::move(request)); + MG_ASSERT(result_rows.size() == 1); + auto &result_row = result_rows.front(); + + const auto convert_edges = [&vertex]( + std::vector<msgs::ExpandOneResultRow::EdgeWithSpecificProperties> &&edge_messages, + const EdgeAtom::Direction direction) { + std::vector<EdgeAccessor> edge_accessors; + edge_accessors.reserve(edge_messages.size()); + switch (direction) { + case EdgeAtom::Direction::IN: { + for (auto &edge : edge_messages) { + edge_accessors.emplace_back( + msgs::Edge{std::move(edge.other_end), vertex.Id(), {}, {edge.gid}, edge.type}); + } + break; + } + case EdgeAtom::Direction::OUT: { + for (auto &edge : edge_messages) { + edge_accessors.emplace_back( + msgs::Edge{vertex.Id(), std::move(edge.other_end), {}, {edge.gid}, edge.type}); + } + break; + } + case EdgeAtom::Direction::BOTH: { + LOG_FATAL("Must indicate exact expansion direction here"); + } + } + return edge_accessors; + }; + current_in_edges_ = + convert_edges(std::move(result_row.in_edges_with_specific_properties), EdgeAtom::Direction::IN); + current_in_edge_it_ = current_in_edges_.begin(); + current_in_edges_ = + convert_edges(std::move(result_row.in_edges_with_specific_properties), EdgeAtom::Direction::OUT); + current_in_edge_it_ = current_in_edges_.begin(); + return true; + } + } + + bool Pull(Frame &frame, ExecutionContext &context) override { + SCOPED_PROFILE_OP("DistributedExpand"); + // A helper function for expanding a node from an edge. + auto pull_node = [this, &frame](const EdgeAccessor &new_edge, EdgeAtom::Direction direction) { + if (self_.common_.existing_node) return; + switch (direction) { + case EdgeAtom::Direction::IN: + frame[self_.common_.node_symbol] = new_edge.From(); + break; + case EdgeAtom::Direction::OUT: + frame[self_.common_.node_symbol] = new_edge.To(); + break; + case EdgeAtom::Direction::BOTH: + LOG_FATAL("Must indicate exact expansion direction here"); + } + }; + + while (true) { + if (MustAbort(context)) throw HintedAbortError(); + // attempt to get a value from the incoming edges + if (current_in_edge_it_ != current_in_edges_.end()) { + auto &edge = *current_in_edge_it_; + ++current_in_edge_it_; + frame[self_.common_.edge_symbol] = edge; + pull_node(edge, EdgeAtom::Direction::IN); + return true; + } + + // attempt to get a value from the outgoing edges + if (current_out_edge_it_ != current_out_edges_.end()) { + auto &edge = *current_out_edge_it_; + ++current_out_edge_it_; + if (self_.common_.direction == EdgeAtom::Direction::BOTH && edge.IsCycle()) { + continue; + }; + frame[self_.common_.edge_symbol] = edge; + pull_node(edge, EdgeAtom::Direction::OUT); + return true; + } + + // If we are here, either the edges have not been initialized, + // or they have been exhausted. Attempt to initialize the edges. + if (!InitEdges(frame, context)) return false; + + // we have re-initialized the edges, continue with the loop + } + } + + void Shutdown() override { input_cursor_->Shutdown(); } + + void Reset() override { + input_cursor_->Reset(); + current_in_edges_.clear(); + current_out_edges_.clear(); + current_in_edge_it_ = current_in_edges_.end(); + current_out_edge_it_ = current_out_edges_.end(); + } + + private: + const Expand &self_; + const UniqueCursorPtr input_cursor_; + std::vector<EdgeAccessor> current_in_edges_; + std::vector<EdgeAccessor> current_out_edges_; + std::vector<EdgeAccessor>::iterator current_in_edge_it_; + std::vector<EdgeAccessor>::iterator current_out_edge_it_; +}; + } // namespace memgraph::query::v2::plan diff --git a/src/query/v2/plan/vertex_count_cache.hpp b/src/query/v2/plan/vertex_count_cache.hpp index c17f0b77d..f1be8e1a1 100644 --- a/src/query/v2/plan/vertex_count_cache.hpp +++ b/src/query/v2/plan/vertex_count_cache.hpp @@ -31,7 +31,7 @@ class VertexCountCache { public: explicit VertexCountCache(TDbAccessor *shard_request_manager) : shard_request_manager_{shard_request_manager} {} - auto NameToLabel(const std::string &name) { return shard_request_manager_->LabelNameToLabelId(name); } + auto NameToLabel(const std::string &name) { return shard_request_manager_->NameToLabel(name); } auto NameToProperty(const std::string &name) { return shard_request_manager_->NameToProperty(name); } auto NameToEdgeType(const std::string &name) { return shard_request_manager_->NameToEdgeType(name); } diff --git a/src/query/v2/requests.hpp b/src/query/v2/requests.hpp index 4b2588c24..d8a8ed287 100644 --- a/src/query/v2/requests.hpp +++ b/src/query/v2/requests.hpp @@ -50,7 +50,7 @@ using PropertyId = memgraph::storage::v3::PropertyId; using EdgeTypeId = memgraph::storage::v3::EdgeTypeId; struct EdgeType { - uint64_t id; + EdgeTypeId id; friend bool operator==(const EdgeType &lhs, const EdgeType &rhs) = default; }; @@ -64,7 +64,7 @@ struct EdgeId { struct Edge { VertexId src; VertexId dst; - std::optional<std::vector<std::pair<PropertyId, Value>>> properties; + std::vector<std::pair<PropertyId, Value>> properties; EdgeId id; EdgeType type; friend bool operator==(const Edge &lhs, const Edge &rhs) { return lhs.id == rhs.id; } @@ -317,20 +317,6 @@ struct Value { } }; -struct ValuesMap { - std::unordered_map<PropertyId, Value> values_map; -}; - -struct MappedValues { - std::vector<ValuesMap> values_map; -}; - -struct ListedValues { - std::vector<std::vector<Value>> properties; -}; - -using Values = std::variant<ListedValues, MappedValues>; - struct Expression { std::string expression; }; @@ -350,7 +336,9 @@ enum class StorageView { OLD = 0, NEW = 1 }; struct ScanVerticesRequest { Hlc transaction_id; + // This should be optional VertexId start_id; + // The empty optional means return all of the properties, while an empty list means do not return any properties std::optional<std::vector<PropertyId>> props_to_return; // expression that determines if vertex is returned or not std::vector<std::string> filter_expressions; @@ -366,6 +354,7 @@ struct ScanVerticesRequest { struct ScanResultRow { Vertex vertex; // empty() is no properties returned + // This should be changed to std::map<PropertyId, Value> std::vector<std::pair<PropertyId, Value>> props; std::vector<Value> evaluated_vertex_expressions; }; @@ -380,6 +369,7 @@ using VertexOrEdgeIds = std::variant<VertexId, EdgeId>; struct GetPropertiesRequest { Hlc transaction_id; + // Shouldn't contain mixed vertex and edge ids VertexOrEdgeIds vertex_or_edge_ids; std::vector<PropertyId> property_ids; std::vector<Expression> expressions; @@ -391,44 +381,48 @@ struct GetPropertiesRequest { struct GetPropertiesResponse { bool success; - Values values; }; enum class EdgeDirection : uint8_t { OUT = 1, IN = 2, BOTH = 3 }; -struct VertexEdgeId { - VertexId vertex_id; - std::optional<EdgeId> next_id; -}; - struct ExpandOneRequest { + // TODO(antaljanosbenjamin): Filtering based on the id of the other end of the edge? Hlc transaction_id; std::vector<VertexId> src_vertices; + // return types that type is in this list + // empty means all the types std::vector<EdgeType> edge_types; - EdgeDirection direction; + EdgeDirection direction{EdgeDirection::OUT}; + // Wether to return multiple edges between the same neighbors bool only_unique_neighbor_rows = false; - // The empty optional means return all of the properties, while an empty - // list means do not return any properties - // TODO(antaljanosbenjamin): All of the special values should be communicated through a single vertex object - // after schema is implemented - // Special values are accepted: - // * __mg__labels + // The empty optional means return all of the properties, while an empty list means do not return any properties std::optional<std::vector<PropertyId>> src_vertex_properties; - // TODO(antaljanosbenjamin): All of the special values should be communicated through a single vertex object - // after schema is implemented - // Special values are accepted: - // * __mg__dst_id (Vertex, but without labels) - // * __mg__type (binary) + // The empty optional means return all of the properties, while an empty list means do not return any properties std::optional<std::vector<PropertyId>> edge_properties; // QUESTION(antaljanosbenjamin): Maybe also add possibility to expressions evaluated on the source vertex? // List of expressions evaluated on edges std::vector<Expression> expressions; std::optional<std::vector<OrderBy>> order_by; + // Limit the edges or the vertices? std::optional<size_t> limit; std::optional<Filter> filter; }; struct ExpandOneResultRow { + struct EdgeWithAllProperties { + VertexId other_end; + EdgeType type; + Gid gid; + std::map<PropertyId, Value> properties; + }; + + struct EdgeWithSpecificProperties { + VertexId other_end; + EdgeType type; + Gid gid; + std::vector<Value> properties; + }; + // NOTE: This struct could be a single Values with columns something like this: // src_vertex(Vertex), vertex_prop1(Value), vertex_prop2(Value), edges(list<Value>) // where edges might be a list of: @@ -437,15 +431,17 @@ struct ExpandOneResultRow { // The drawback of this is currently the key of the map is always interpreted as a string in Value, not as an // integer, which should be in case of mapped properties. Vertex src_vertex; - std::optional<std::map<PropertyId, Value>> src_vertex_properties; + std::map<PropertyId, Value> src_vertex_properties; // NOTE: If the desired edges are specified in the request, // edges_with_specific_properties will have a value and it will // return the properties as a vector of property values. The order // of the values returned should be the same as the PropertyIds // were defined in the request. - std::optional<std::vector<std::tuple<VertexId, Gid, std::map<PropertyId, Value>>>> edges_with_all_properties; - std::optional<std::vector<std::tuple<VertexId, Gid, std::vector<Value>>>> edges_with_specific_properties; + std::vector<EdgeWithAllProperties> in_edges_with_all_properties; + std::vector<EdgeWithSpecificProperties> in_edges_with_specific_properties; + std::vector<EdgeWithAllProperties> out_edges_with_all_properties; + std::vector<EdgeWithSpecificProperties> out_edges_with_specific_properties; }; struct ExpandOneResponse { @@ -455,6 +451,7 @@ struct ExpandOneResponse { struct UpdateVertexProp { PrimaryKey primary_key; + // This should be a map std::vector<std::pair<PropertyId, Value>> property_updates; }; @@ -462,6 +459,7 @@ struct UpdateEdgeProp { EdgeId edge_id; VertexId src; VertexId dst; + // This should be a map std::vector<std::pair<PropertyId, Value>> property_updates; }; @@ -471,12 +469,7 @@ struct UpdateEdgeProp { struct NewVertex { std::vector<Label> label_ids; PrimaryKey primary_key; - std::vector<std::pair<PropertyId, Value>> properties; -}; - -struct NewVertexLabel { - std::string label; - PrimaryKey primary_key; + // This should be a map std::vector<std::pair<PropertyId, Value>> properties; }; diff --git a/src/query/v2/shard_request_manager.hpp b/src/query/v2/shard_request_manager.hpp index 5e6ef60e5..be8ca6852 100644 --- a/src/query/v2/shard_request_manager.hpp +++ b/src/query/v2/shard_request_manager.hpp @@ -14,7 +14,9 @@ #include <chrono> #include <deque> #include <iostream> +#include <iterator> #include <map> +#include <numeric> #include <optional> #include <random> #include <set> @@ -116,14 +118,14 @@ class ShardRequestManagerInterface { virtual std::vector<VertexAccessor> Request(ExecutionState<ScanVerticesRequest> &state) = 0; virtual std::vector<CreateVerticesResponse> Request(ExecutionState<CreateVerticesRequest> &state, std::vector<NewVertex> new_vertices) = 0; - virtual std::vector<ExpandOneResponse> Request(ExecutionState<ExpandOneRequest> &state) = 0; + virtual std::vector<ExpandOneResultRow> Request(ExecutionState<ExpandOneRequest> &state, + ExpandOneRequest request) = 0; virtual std::vector<CreateExpandResponse> Request(ExecutionState<CreateExpandRequest> &state, std::vector<NewExpand> new_edges) = 0; - // TODO(antaljanosbenjamin): unify the GetXXXId and NameToId functions to have consistent naming, return type and - // implementation + virtual storage::v3::EdgeTypeId NameToEdgeType(const std::string &name) const = 0; virtual storage::v3::PropertyId NameToProperty(const std::string &name) const = 0; - virtual storage::v3::LabelId LabelNameToLabelId(const std::string &name) const = 0; + virtual storage::v3::LabelId NameToLabel(const std::string &name) const = 0; virtual const std::string &PropertyToName(memgraph::storage::v3::PropertyId prop) const = 0; virtual const std::string &LabelToName(memgraph::storage::v3::LabelId label) const = 0; virtual const std::string &EdgeTypeToName(memgraph::storage::v3::EdgeTypeId type) const = 0; @@ -209,15 +211,15 @@ class ShardRequestManager : public ShardRequestManagerInterface { } storage::v3::EdgeTypeId NameToEdgeType(const std::string &name) const override { - return *shards_map_.GetEdgeTypeId(name); + return shards_map_.GetEdgeTypeId(name).value(); } storage::v3::PropertyId NameToProperty(const std::string &name) const override { - return *shards_map_.GetPropertyId(name); + return shards_map_.GetPropertyId(name).value(); } - storage::v3::LabelId LabelNameToLabelId(const std::string &name) const override { - return shards_map_.GetLabelId(name); + storage::v3::LabelId NameToLabel(const std::string &name) const override { + return shards_map_.GetLabelId(name).value(); } const std::string &PropertyToName(memgraph::storage::v3::PropertyId /*prop*/) const override { @@ -317,13 +319,13 @@ class ShardRequestManager : public ShardRequestManagerInterface { return responses; } - std::vector<ExpandOneResponse> Request(ExecutionState<ExpandOneRequest> &state) override { + std::vector<ExpandOneResultRow> Request(ExecutionState<ExpandOneRequest> &state, ExpandOneRequest request) override { // TODO(kostasrim)Update to limit the batch size here // Expansions of the destination must be handled by the caller. For example // match (u:L1 { prop : 1 })-[:Friend]-(v:L1) // For each vertex U, the ExpandOne will result in <U, Edges>. The destination vertex and its properties // must be fetched again with an ExpandOne(Edges.dst) - MaybeInitializeExecutionState(state); + MaybeInitializeExecutionState(state, std::move(request)); std::vector<ExpandOneResponse> responses; auto &shard_cache_ref = state.shard_cache; @@ -334,9 +336,18 @@ class ShardRequestManager : public ShardRequestManagerInterface { do { AwaitOnResponses(state, responses); } while (!state.shard_cache.empty()); + std::vector<ExpandOneResultRow> result_rows; + const auto total_row_count = std::accumulate( + responses.begin(), responses.end(), 0, + [](const int64_t partial_count, const ExpandOneResponse &resp) { return partial_count + resp.result.size(); }); + result_rows.reserve(total_row_count); + for (auto &response : responses) { + result_rows.insert(result_rows.end(), std::make_move_iterator(response.result.begin()), + std::make_move_iterator(response.result.end())); + } MaybeCompleteState(state); - return responses; + return result_rows; } private: @@ -455,7 +466,7 @@ class ShardRequestManager : public ShardRequestManagerInterface { state.state = ExecutionState<ScanVerticesRequest>::EXECUTING; } - void MaybeInitializeExecutionState(ExecutionState<ExpandOneRequest> &state) { + void MaybeInitializeExecutionState(ExecutionState<ExpandOneRequest> &state, ExpandOneRequest request) { ThrowIfStateCompleted(state); if (ShallNotInitializeState(state)) { return; @@ -463,24 +474,18 @@ class ShardRequestManager : public ShardRequestManagerInterface { state.transaction_id = transaction_id_; std::map<Shard, ExpandOneRequest> per_shard_request_table; - MG_ASSERT(state.requests.size() == 1); - auto top_level_rqst = std::move(*state.requests.begin()); - auto top_level_rqst_template = top_level_rqst; + auto top_level_rqst_template = request; + top_level_rqst_template.transaction_id = transaction_id_; top_level_rqst_template.src_vertices.clear(); - top_level_rqst_template.edge_types.clear(); state.requests.clear(); - size_t id = 0; - for (const auto &vertex : top_level_rqst.src_vertices) { + for (auto &vertex : request.src_vertices) { auto shard = shards_map_.GetShardForKey(vertex.first.id, storage::conversions::ConvertPropertyVector(vertex.second)); if (!per_shard_request_table.contains(shard)) { - ExpandOneRequest expand_v_rqst = top_level_rqst_template; - per_shard_request_table.insert(std::pair(shard, std::move(expand_v_rqst))); + per_shard_request_table.insert(std::pair(shard, top_level_rqst_template)); state.shard_cache.push_back(shard); } per_shard_request_table[shard].src_vertices.push_back(vertex); - per_shard_request_table[shard].edge_types.push_back(top_level_rqst.edge_types[id]); - ++id; } for (auto &[shard, rqst] : per_shard_request_table) { @@ -545,10 +550,11 @@ class ShardRequestManager : public ShardRequestManagerInterface { void SendAllRequests(ExecutionState<ExpandOneRequest> &state, std::vector<memgraph::coordinator::Shard> &shard_cache_ref) { size_t id = 0; - for (auto shard_it = shard_cache_ref.begin(); shard_it != shard_cache_ref.end(); ++id) { + for (auto shard_it = shard_cache_ref.begin(); shard_it != shard_cache_ref.end(); ++shard_it) { auto &storage_client = GetStorageClientForShard(*shard_it); ReadRequests req = state.requests[id]; storage_client.SendAsyncReadRequest(req); + ++id; } } @@ -593,13 +599,13 @@ class ShardRequestManager : public ShardRequestManagerInterface { auto &shard_cache_ref = state.shard_cache; int64_t request_idx = 0; - for (auto shard_it = shard_cache_ref.begin(); shard_it != shard_cache_ref.end(); ++request_idx) { - auto &storage_client = GetStorageClientForShard( - *state.label, - storage::conversions::ConvertPropertyVector(state.requests[request_idx].src_vertices[0].second)); + for (auto shard_it = shard_cache_ref.begin(); shard_it != shard_cache_ref.end();) { + auto &storage_client = GetStorageClientForShard(*shard_it); auto poll_result = storage_client.PollAsyncReadRequest(); if (!poll_result) { + ++shard_it; + ++request_idx; continue; } @@ -622,7 +628,6 @@ class ShardRequestManager : public ShardRequestManagerInterface { // Needed to maintain the 1-1 mapping between the ShardCache and the requests. auto it = state.requests.begin() + request_idx; state.requests.erase(it); - --request_idx; } } diff --git a/src/storage/v3/bindings/db_accessor.hpp b/src/storage/v3/bindings/db_accessor.hpp index 405c41a9f..1424c1542 100644 --- a/src/storage/v3/bindings/db_accessor.hpp +++ b/src/storage/v3/bindings/db_accessor.hpp @@ -83,7 +83,7 @@ class DbAccessor final { } storage::v3::ResultSchema<VertexAccessor> InsertVertexAndValidate( - storage::v3::LabelId primary_label, const std::vector<storage::v3::LabelId> &labels, + const storage::v3::LabelId primary_label, const std::vector<storage::v3::LabelId> &labels, const std::vector<std::pair<storage::v3::PropertyId, storage::v3::PropertyValue>> &properties) { auto maybe_vertex_acc = accessor_->CreateVertexAndValidate(primary_label, labels, properties); if (maybe_vertex_acc.HasError()) { diff --git a/src/storage/v3/shard.hpp b/src/storage/v3/shard.hpp index db54741da..e866311de 100644 --- a/src/storage/v3/shard.hpp +++ b/src/storage/v3/shard.hpp @@ -342,6 +342,8 @@ class Shard final { LabelId PrimaryLabel() const; + [[nodiscard]] bool IsVertexBelongToShard(const VertexId &vertex_id) const; + /// @throw std::bad_alloc bool CreateIndex(LabelId label, std::optional<uint64_t> desired_commit_timestamp = {}); @@ -376,8 +378,6 @@ class Shard final { uint64_t CommitTimestamp(std::optional<uint64_t> desired_commit_timestamp = {}); - [[nodiscard]] bool IsVertexBelongToShard(const VertexId &vertex_id) const; - // Main object storage NameIdMapper name_id_mapper_; LabelId primary_label_; diff --git a/src/storage/v3/shard_rsm.cpp b/src/storage/v3/shard_rsm.cpp index e50318d25..1e5609b23 100644 --- a/src/storage/v3/shard_rsm.cpp +++ b/src/storage/v3/shard_rsm.cpp @@ -33,26 +33,26 @@ #include "storage/v3/storage.hpp" #include "storage/v3/value_conversions.hpp" #include "storage/v3/vertex_accessor.hpp" +#include "storage/v3/vertex_id.hpp" #include "storage/v3/view.hpp" -using memgraph::msgs::Label; -using memgraph::msgs::PropertyId; -using memgraph::msgs::Value; -using memgraph::msgs::Vertex; -using memgraph::msgs::VertexId; - -using memgraph::storage::conversions::ConvertPropertyVector; -using memgraph::storage::conversions::ConvertValueVector; -using memgraph::storage::conversions::FromPropertyValueToValue; -using memgraph::storage::conversions::ToPropertyValue; -using memgraph::storage::v3::View; - namespace memgraph::storage::v3 { +using msgs::Label; +using msgs::PropertyId; +using msgs::Value; + +using conversions::ConvertPropertyVector; +using conversions::ConvertValueVector; +using conversions::FromPropertyValueToValue; +using conversions::ToMsgsVertexId; +using conversions::ToPropertyValue; namespace { -std::vector<std::pair<memgraph::storage::v3::PropertyId, memgraph::storage::v3::PropertyValue>> ConvertPropertyMap( +namespace msgs = msgs; + +std::vector<std::pair<PropertyId, PropertyValue>> ConvertPropertyMap( std::vector<std::pair<PropertyId, Value>> &&properties) { - std::vector<std::pair<memgraph::storage::v3::PropertyId, memgraph::storage::v3::PropertyValue>> ret; + std::vector<std::pair<PropertyId, PropertyValue>> ret; ret.reserve(properties.size()); std::transform(std::make_move_iterator(properties.begin()), std::make_move_iterator(properties.end()), @@ -63,9 +63,8 @@ std::vector<std::pair<memgraph::storage::v3::PropertyId, memgraph::storage::v3:: return ret; } -std::vector<std::pair<memgraph::storage::v3::PropertyId, Value>> FromMap( - const std::map<PropertyId, Value> &properties) { - std::vector<std::pair<memgraph::storage::v3::PropertyId, Value>> ret; +std::vector<std::pair<PropertyId, Value>> FromMap(const std::map<PropertyId, Value> &properties) { + std::vector<std::pair<PropertyId, Value>> ret; ret.reserve(properties.size()); std::transform(properties.begin(), properties.end(), std::back_inserter(ret), @@ -74,9 +73,9 @@ std::vector<std::pair<memgraph::storage::v3::PropertyId, Value>> FromMap( return ret; } -std::optional<std::map<PropertyId, Value>> CollectSpecificPropertiesFromAccessor( - const memgraph::storage::v3::VertexAccessor &acc, const std::vector<memgraph::storage::v3::PropertyId> &props, - View view) { +std::optional<std::map<PropertyId, Value>> CollectSpecificPropertiesFromAccessor(const VertexAccessor &acc, + const std::vector<PropertyId> &props, + View view) { std::map<PropertyId, Value> ret; for (const auto &prop : props) { @@ -86,19 +85,14 @@ std::optional<std::map<PropertyId, Value>> CollectSpecificPropertiesFromAccessor return std::nullopt; } auto &value = result.GetValue(); - if (value.IsNull()) { - spdlog::debug("The specified property does not exist but it should"); - return std::nullopt; - } - ret.emplace(std::make_pair(prop, FromPropertyValueToValue(value))); + ret.emplace(std::make_pair(prop, FromPropertyValueToValue(std::move(value)))); } return ret; } -std::optional<std::map<PropertyId, Value>> CollectAllPropertiesFromAccessor( - const memgraph::storage::v3::VertexAccessor &acc, memgraph::storage::v3::View view, - const memgraph::storage::v3::Schemas::Schema *schema) { +std::optional<std::map<PropertyId, Value>> CollectAllPropertiesFromAccessor(const VertexAccessor &acc, View view, + const Schemas::Schema *schema) { std::map<PropertyId, Value> ret; auto props = acc.Properties(view); if (props.HasError()) { @@ -106,36 +100,40 @@ std::optional<std::map<PropertyId, Value>> CollectAllPropertiesFromAccessor( return std::nullopt; } - const auto &properties = props.GetValue(); - std::transform(properties.begin(), properties.end(), std::inserter(ret, ret.begin()), [](const auto &property) { - return std::make_pair(property.first, FromPropertyValueToValue(property.second)); - }); + auto &properties = props.GetValue(); + std::transform(properties.begin(), properties.end(), std::inserter(ret, ret.begin()), + [](std::pair<const PropertyId, PropertyValue> &pair) { + return std::make_pair(pair.first, FromPropertyValueToValue(std::move(pair.second))); + }); + properties.clear(); + // TODO(antaljanosbenjamin): Once the VertexAccessor::Properties returns also the primary keys, we can get rid of this + // code. auto maybe_pk = acc.PrimaryKey(view); if (maybe_pk.HasError()) { spdlog::debug("Encountered an error while trying to get vertex primary key."); } - const auto pk = maybe_pk.GetValue(); + auto &pk = maybe_pk.GetValue(); MG_ASSERT(schema->second.size() == pk.size(), "PrimaryKey size does not match schema!"); for (size_t i{0}; i < schema->second.size(); ++i) { - ret.emplace(schema->second[i].property_id, FromPropertyValueToValue(pk[i])); + ret.emplace(schema->second[i].property_id, FromPropertyValueToValue(std::move(pk[i]))); } return ret; } -memgraph::msgs::Value ConstructValueVertex(const memgraph::storage::v3::VertexAccessor &acc, View view) { +msgs::Value ConstructValueVertex(const VertexAccessor &acc, View view) { // Get the vertex id auto prim_label = acc.PrimaryLabel(view).GetValue(); - memgraph::msgs::Label value_label{.id = prim_label}; + msgs::Label value_label{.id = prim_label}; auto prim_key = ConvertValueVector(acc.PrimaryKey(view).GetValue()); - memgraph::msgs::VertexId vertex_id = std::make_pair(value_label, prim_key); + msgs::VertexId vertex_id = std::make_pair(value_label, prim_key); // Get the labels auto vertex_labels = acc.Labels(view).GetValue(); - std::vector<memgraph::msgs::Label> value_labels; + std::vector<msgs::Label> value_labels; value_labels.reserve(vertex_labels.size()); std::transform(vertex_labels.begin(), vertex_labels.end(), std::back_inserter(value_labels), @@ -144,37 +142,37 @@ memgraph::msgs::Value ConstructValueVertex(const memgraph::storage::v3::VertexAc return Value({.id = vertex_id, .labels = value_labels}); } -Value ConstructValueEdge(const memgraph::storage::v3::EdgeAccessor &acc, View view) { - memgraph::msgs::EdgeType type = {.id = acc.EdgeType().AsUint()}; - memgraph::msgs::EdgeId gid = {.gid = acc.Gid().AsUint()}; +Value ConstructValueEdge(const EdgeAccessor &acc, View view) { + msgs::EdgeType type = {.id = acc.EdgeType()}; + msgs::EdgeId gid = {.gid = acc.Gid().AsUint()}; Label src_prim_label = {.id = acc.FromVertex().primary_label}; - memgraph::msgs::VertexId src_vertex = - std::make_pair(src_prim_label, ConvertValueVector(acc.FromVertex().primary_key)); + msgs::VertexId src_vertex = std::make_pair(src_prim_label, ConvertValueVector(acc.FromVertex().primary_key)); Label dst_prim_label = {.id = acc.ToVertex().primary_label}; - memgraph::msgs::VertexId dst_vertex = std::make_pair(dst_prim_label, ConvertValueVector(acc.ToVertex().primary_key)); + msgs::VertexId dst_vertex = std::make_pair(dst_prim_label, ConvertValueVector(acc.ToVertex().primary_key)); - std::optional<std::vector<std::pair<PropertyId, Value>>> properties_opt = {}; - const auto &properties = acc.Properties(view); + auto properties = acc.Properties(view); + std::vector<std::pair<PropertyId, Value>> present_properties; if (properties.HasValue()) { - const auto &props = properties.GetValue(); - std::vector<std::pair<PropertyId, Value>> present_properties; + auto &props = properties.GetValue(); present_properties.reserve(props.size()); std::transform(props.begin(), props.end(), std::back_inserter(present_properties), - [](const auto &prop) { return std::make_pair(prop.first, FromPropertyValueToValue(prop.second)); }); - - properties_opt = std::move(present_properties); + [](std::pair<const PropertyId, PropertyValue> &prop) { + return std::make_pair(prop.first, FromPropertyValueToValue(std::move(prop.second))); + }); } - return Value({.src = src_vertex, .dst = dst_vertex, .properties = properties_opt, .id = gid, .type = type}); + return Value(msgs::Edge{.src = std::move(src_vertex), + .dst = std::move(dst_vertex), + .properties = std::move(present_properties), + .id = gid, + .type = type}); } -Value FromTypedValueToValue(memgraph::storage::v3::TypedValue &&tv) { - using memgraph::storage::v3::TypedValue; - +Value FromTypedValueToValue(TypedValue &&tv) { switch (tv.type()) { case TypedValue::Type::Bool: return Value(tv.ValueBool()); @@ -219,7 +217,7 @@ Value FromTypedValueToValue(memgraph::storage::v3::TypedValue &&tv) { return Value{}; } -std::vector<Value> ConvertToValueVectorFromTypedValueVector(std::vector<memgraph::storage::v3::TypedValue> &&vec) { +std::vector<Value> ConvertToValueVectorFromTypedValueVector(std::vector<TypedValue> &&vec) { std::vector<Value> ret; ret.reserve(vec.size()); @@ -238,9 +236,8 @@ std::vector<PropertyId> NamesToProperties(const std::vector<std::string> &proper return properties; } -std::vector<memgraph::storage::v3::LabelId> NamesToLabels(const std::vector<std::string> &label_names, - DbAccessor &dba) { - std::vector<memgraph::storage::v3::LabelId> labels; +std::vector<LabelId> NamesToLabels(const std::vector<std::string> &label_names, DbAccessor &dba) { + std::vector<LabelId> labels; labels.reserve(label_names.size()); for (const auto &name : label_names) { labels.push_back(dba.NameToLabel(name)); @@ -249,8 +246,7 @@ std::vector<memgraph::storage::v3::LabelId> NamesToLabels(const std::vector<std: } template <class TExpression> -auto Eval(TExpression *expr, EvaluationContext &ctx, AstStorage &storage, - memgraph::storage::v3::ExpressionEvaluator &eval, DbAccessor &dba) { +auto Eval(TExpression *expr, EvaluationContext &ctx, AstStorage &storage, ExpressionEvaluator &eval, DbAccessor &dba) { ctx.properties = NamesToProperties(storage.properties_, dba); ctx.labels = NamesToLabels(storage.labels_, dba); auto value = expr->Accept(eval); @@ -266,9 +262,9 @@ std::any ParseExpression(const std::string &expr, memgraph::expr::AstStorage &st return visitor.visit(ast); } -TypedValue ComputeExpression(DbAccessor &dba, const std::optional<memgraph::storage::v3::VertexAccessor> &v_acc, - const std::optional<memgraph::storage::v3::EdgeAccessor> &e_acc, - const std::string &expression, std::string_view node_name, std::string_view edge_name) { +TypedValue ComputeExpression(DbAccessor &dba, const std::optional<VertexAccessor> &v_acc, + const std::optional<EdgeAccessor> &e_acc, const std::string &expression, + std::string_view node_name, std::string_view edge_name) { AstStorage storage; Frame frame{1 + 1}; // 1 for the node_identifier, 1 for the edge_identifier SymbolTable symbol_table; @@ -308,18 +304,18 @@ TypedValue ComputeExpression(DbAccessor &dba, const std::optional<memgraph::stor return Eval(std::any_cast<Expression *>(expr), ctx, storage, eval, dba); } -bool FilterOnVertex(DbAccessor &dba, const memgraph::storage::v3::VertexAccessor &v_acc, - const std::vector<std::string> &filters, const std::string_view node_name) { +bool FilterOnVertex(DbAccessor &dba, const VertexAccessor &v_acc, const std::vector<std::string> &filters, + const std::string_view node_name) { return std::ranges::all_of(filters, [&node_name, &dba, &v_acc](const auto &filter_expr) { auto res = ComputeExpression(dba, v_acc, std::nullopt, filter_expr, node_name, ""); return res.IsBool() && res.ValueBool(); }); } -std::vector<memgraph::storage::v3::TypedValue> EvaluateVertexExpressions( - DbAccessor &dba, const memgraph::storage::v3::VertexAccessor &v_acc, const std::vector<std::string> &expressions, - std::string_view node_name) { - std::vector<memgraph::storage::v3::TypedValue> evaluated_expressions; +std::vector<TypedValue> EvaluateVertexExpressions(DbAccessor &dba, const VertexAccessor &v_acc, + const std::vector<std::string> &expressions, + std::string_view node_name) { + std::vector<TypedValue> evaluated_expressions; evaluated_expressions.reserve(expressions.size()); std::transform(expressions.begin(), expressions.end(), std::back_inserter(evaluated_expressions), @@ -330,24 +326,21 @@ std::vector<memgraph::storage::v3::TypedValue> EvaluateVertexExpressions( return evaluated_expressions; } -bool DoesEdgeTypeMatch(const memgraph::msgs::ExpandOneRequest &req, const memgraph::storage::v3::EdgeAccessor &edge) { +bool DoesEdgeTypeMatch(const std::vector<msgs::EdgeType> &edge_types, const EdgeAccessor &edge) { // TODO(gvolfing) This should be checked only once and handled accordingly. - if (req.edge_types.empty()) { + if (edge_types.empty()) { return true; } - return std::ranges::any_of(req.edge_types.cbegin(), req.edge_types.cend(), - [&edge](const memgraph::msgs::EdgeType &edge_type) { - return memgraph::storage::v3::EdgeTypeId::FromUint(edge_type.id) == edge.EdgeType(); - }); + return std::ranges::any_of(edge_types.begin(), edge_types.end(), + [&edge](const msgs::EdgeType &edge_type) { return edge_type.id == edge.EdgeType(); }); } struct LocalError {}; -std::optional<memgraph::msgs::Vertex> FillUpSourceVertex( - const std::optional<memgraph::storage::v3::VertexAccessor> &v_acc, memgraph::msgs::ExpandOneRequest &req, - memgraph::msgs::VertexId src_vertex) { - auto secondary_labels = v_acc->Labels(View::OLD); +std::optional<msgs::Vertex> FillUpSourceVertex(const std::optional<VertexAccessor> &v_acc, + const msgs::ExpandOneRequest &req, msgs::VertexId src_vertex) { + auto secondary_labels = v_acc->Labels(View::NEW); if (secondary_labels.HasError()) { spdlog::debug("Encountered an error while trying to get the secondary labels of a vertex. Transaction id: {}", req.transaction_id.logical_id); @@ -355,22 +348,22 @@ std::optional<memgraph::msgs::Vertex> FillUpSourceVertex( } auto &sec_labels = secondary_labels.GetValue(); - memgraph::msgs::Vertex source_vertex; + msgs::Vertex source_vertex; source_vertex.id = src_vertex; source_vertex.labels.reserve(sec_labels.size()); std::transform(sec_labels.begin(), sec_labels.end(), std::back_inserter(source_vertex.labels), - [](auto label_id) { return memgraph::msgs::Label{.id = label_id}; }); + [](auto label_id) { return msgs::Label{.id = label_id}; }); return source_vertex; } -std::optional<std::map<PropertyId, Value>> FillUpSourceVertexProperties( - const std::optional<memgraph::storage::v3::VertexAccessor> &v_acc, memgraph::msgs::ExpandOneRequest &req) { +std::optional<std::map<PropertyId, Value>> FillUpSourceVertexProperties(const std::optional<VertexAccessor> &v_acc, + const msgs::ExpandOneRequest &req) { std::map<PropertyId, Value> src_vertex_properties; if (!req.src_vertex_properties) { - auto props = v_acc->Properties(View::OLD); + auto props = v_acc->Properties(View::NEW); if (props.HasError()) { spdlog::debug("Encountered an error while trying to access vertex properties. Transaction id: {}", req.transaction_id.logical_id); @@ -378,31 +371,34 @@ std::optional<std::map<PropertyId, Value>> FillUpSourceVertexProperties( } for (auto &[key, val] : props.GetValue()) { - src_vertex_properties.insert(std::make_pair(key, FromPropertyValueToValue(val))); + src_vertex_properties.insert(std::make_pair(key, FromPropertyValueToValue(std::move(val)))); } } else if (req.src_vertex_properties.value().empty()) { // NOOP } else { - auto &vertex_props = req.src_vertex_properties.value(); - std::transform(vertex_props.begin(), vertex_props.end(), - std::inserter(src_vertex_properties, src_vertex_properties.begin()), [&v_acc](const auto &prop) { - const auto &prop_val = v_acc->GetProperty(prop, View::OLD); - return std::make_pair(prop, FromPropertyValueToValue(prop_val.GetValue())); - }); + for (const auto &prop : req.src_vertex_properties.value()) { + auto prop_val = v_acc->GetProperty(prop, View::OLD); + if (prop_val.HasError()) { + spdlog::debug("Encountered an error while trying to access vertex properties. Transaction id: {}", + req.transaction_id.logical_id); + return std::nullopt; + } + src_vertex_properties.insert(std::make_pair(prop, FromPropertyValueToValue(std::move(prop_val.GetValue())))); + } } return src_vertex_properties; } -std::optional<std::array<std::vector<memgraph::storage::v3::EdgeAccessor>, 2>> FillUpConnectingEdges( - const std::optional<memgraph::storage::v3::VertexAccessor> &v_acc, memgraph::msgs::ExpandOneRequest &req) { - std::vector<memgraph::storage::v3::EdgeAccessor> in_edges; - std::vector<memgraph::storage::v3::EdgeAccessor> out_edges; +std::optional<std::array<std::vector<EdgeAccessor>, 2>> FillUpConnectingEdges( + const std::optional<VertexAccessor> &v_acc, const msgs::ExpandOneRequest &req) { + std::vector<EdgeAccessor> in_edges; + std::vector<EdgeAccessor> out_edges; switch (req.direction) { - case memgraph::msgs::EdgeDirection::OUT: { - auto out_edges_result = v_acc->OutEdges(View::OLD); + case msgs::EdgeDirection::OUT: { + auto out_edges_result = v_acc->OutEdges(View::NEW); if (out_edges_result.HasError()) { spdlog::debug("Encountered an error while trying to get out-going EdgeAccessors. Transaction id: {}", req.transaction_id.logical_id); @@ -411,8 +407,8 @@ std::optional<std::array<std::vector<memgraph::storage::v3::EdgeAccessor>, 2>> F out_edges = std::move(out_edges_result.GetValue()); break; } - case memgraph::msgs::EdgeDirection::IN: { - auto in_edges_result = v_acc->InEdges(View::OLD); + case msgs::EdgeDirection::IN: { + auto in_edges_result = v_acc->InEdges(View::NEW); if (in_edges_result.HasError()) { spdlog::debug( "Encountered an error while trying to get in-going EdgeAccessors. Transaction id: {}"[req.transaction_id @@ -422,8 +418,8 @@ std::optional<std::array<std::vector<memgraph::storage::v3::EdgeAccessor>, 2>> F in_edges = std::move(in_edges_result.GetValue()); break; } - case memgraph::msgs::EdgeDirection::BOTH: { - auto in_edges_result = v_acc->InEdges(View::OLD); + case msgs::EdgeDirection::BOTH: { + auto in_edges_result = v_acc->InEdges(View::NEW); if (in_edges_result.HasError()) { spdlog::debug("Encountered an error while trying to get in-going EdgeAccessors. Transaction id: {}", req.transaction_id.logical_id); @@ -431,7 +427,7 @@ std::optional<std::array<std::vector<memgraph::storage::v3::EdgeAccessor>, 2>> F } in_edges = std::move(in_edges_result.GetValue()); - auto out_edges_result = v_acc->OutEdges(View::OLD); + auto out_edges_result = v_acc->OutEdges(View::NEW); if (out_edges_result.HasError()) { spdlog::debug("Encountered an error while trying to get out-going EdgeAccessors. Transaction id: {}", req.transaction_id.logical_id); @@ -441,115 +437,93 @@ std::optional<std::array<std::vector<memgraph::storage::v3::EdgeAccessor>, 2>> F break; } } - return std::array<std::vector<memgraph::storage::v3::EdgeAccessor>, 2>{in_edges, out_edges}; + return std::array<std::vector<EdgeAccessor>, 2>{in_edges, out_edges}; } -using AllEdgePropertyDataSructure = std::map<PropertyId, memgraph::msgs::Value>; -using SpecificEdgePropertyDataSructure = std::vector<memgraph::msgs::Value>; +using AllEdgePropertyDataSructure = std::map<PropertyId, msgs::Value>; +using SpecificEdgePropertyDataSructure = std::vector<msgs::Value>; -using AllEdgeProperties = std::tuple<memgraph::msgs::VertexId, memgraph::msgs::Gid, AllEdgePropertyDataSructure>; -using SpecificEdgeProperties = - std::tuple<memgraph::msgs::VertexId, memgraph::msgs::Gid, SpecificEdgePropertyDataSructure>; +using AllEdgeProperties = std::tuple<msgs::VertexId, msgs::Gid, AllEdgePropertyDataSructure>; +using SpecificEdgeProperties = std::tuple<msgs::VertexId, msgs::Gid, SpecificEdgePropertyDataSructure>; using SpecificEdgePropertiesVector = std::vector<SpecificEdgeProperties>; using AllEdgePropertiesVector = std::vector<AllEdgeProperties>; -template <typename ReturnType, typename EdgeProperties, typename EdgePropertyDataStructure, typename Functor> -std::optional<ReturnType> GetEdgesWithProperties(const std::vector<memgraph::storage::v3::EdgeAccessor> &edges, - const memgraph::msgs::ExpandOneRequest &req, - Functor get_edge_properties) { - ReturnType ret; - ret.reserve(edges.size()); +using EdgeFiller = std::function<bool(const EdgeAccessor &edge, bool is_in_edge, msgs::ExpandOneResultRow &result_row)>; +template <bool are_in_edges> +bool FillEdges(const std::vector<EdgeAccessor> &edges, const msgs::ExpandOneRequest &req, msgs::ExpandOneResultRow &row, + const EdgeFiller &edge_filler) { for (const auto &edge : edges) { - if (!DoesEdgeTypeMatch(req, edge)) { + if (!DoesEdgeTypeMatch(req.edge_types, edge)) { continue; } - EdgeProperties ret_tuple; - - memgraph::msgs::Label label; - label.id = edge.FromVertex().primary_label; - memgraph::msgs::VertexId other_vertex = std::make_pair(label, ConvertValueVector(edge.FromVertex().primary_key)); - - const auto edge_props_var = get_edge_properties(edge); - - if (std::get_if<LocalError>(&edge_props_var) != nullptr) { - return std::nullopt; - } - - auto edge_props = std::get<EdgePropertyDataStructure>(edge_props_var); - memgraph::msgs::Gid gid = edge.Gid().AsUint(); - - ret.emplace_back(EdgeProperties{other_vertex, gid, edge_props}); - } - - return ret; -} - -template <typename TPropertyValue, typename TPropertyNullopt> -void SetFinalEdgeProperties(std::optional<TPropertyValue> &properties_to_value, - std::optional<TPropertyNullopt> &properties_to_nullopt, const TPropertyValue &ret_out, - const TPropertyValue &ret_in, const memgraph::msgs::ExpandOneRequest &req) { - switch (req.direction) { - case memgraph::msgs::EdgeDirection::OUT: { - properties_to_value = std::move(ret_out); - break; - } - case memgraph::msgs::EdgeDirection::IN: { - properties_to_value = std::move(ret_in); - break; - } - case memgraph::msgs::EdgeDirection::BOTH: { - TPropertyValue ret; - ret.resize(ret_out.size() + ret_in.size()); - ret.insert(ret.end(), std::make_move_iterator(ret_in.begin()), std::make_move_iterator(ret_in.end())); - ret.insert(ret.end(), std::make_move_iterator(ret_out.begin()), std::make_move_iterator(ret_out.end())); - - properties_to_value = ret; - break; + if (!edge_filler(edge, are_in_edges, row)) { + return false; } } - properties_to_nullopt = {}; + + return true; } -std::optional<memgraph::msgs::ExpandOneResultRow> GetExpandOneResult(memgraph::storage::v3::Shard::Accessor &acc, - memgraph::msgs::VertexId src_vertex, - memgraph::msgs::ExpandOneRequest req) { - using EdgeProperties = - std::variant<LocalError, std::map<PropertyId, memgraph::msgs::Value>, std::vector<memgraph::msgs::Value>>; - std::function<EdgeProperties(const memgraph::storage::v3::EdgeAccessor &)> get_edge_properties; +std::optional<msgs::ExpandOneResultRow> GetExpandOneResult(Shard::Accessor &acc, msgs::VertexId src_vertex, + const msgs::ExpandOneRequest &req) { + EdgeFiller edge_filler; if (!req.edge_properties) { - get_edge_properties = [&req](const memgraph::storage::v3::EdgeAccessor &edge) -> EdgeProperties { - std::map<PropertyId, memgraph::msgs::Value> ret; - auto property_results = edge.Properties(View::OLD); - if (property_results.HasError()) { - spdlog::debug("Encountered an error while trying to get out-going EdgeAccessors. Transaction id: {}", - req.transaction_id.logical_id); - return LocalError{}; + edge_filler = [transaction_id = req.transaction_id.logical_id](const EdgeAccessor &edge, const bool is_in_edge, + msgs::ExpandOneResultRow &result_row) -> bool { + auto properties_results = edge.Properties(View::NEW); + if (properties_results.HasError()) { + spdlog::debug("Encountered an error while trying to get edge properties. Transaction id: {}", transaction_id); + return false; } - for (const auto &[prop_key, prop_val] : property_results.GetValue()) { - ret.insert(std::make_pair(prop_key, FromPropertyValueToValue(prop_val))); + std::map<PropertyId, msgs::Value> value_properties; + for (auto &[prop_key, prop_val] : properties_results.GetValue()) { + value_properties.insert(std::make_pair(prop_key, FromPropertyValueToValue(std::move(prop_val)))); } - return ret; + using EdgeWithAllProperties = msgs::ExpandOneResultRow::EdgeWithAllProperties; + EdgeWithAllProperties edges{ToMsgsVertexId(edge.FromVertex()), msgs::EdgeType{edge.EdgeType()}, + edge.Gid().AsUint(), std::move(value_properties)}; + if (is_in_edge) { + result_row.in_edges_with_all_properties.push_back(std::move(edges)); + } else { + result_row.out_edges_with_all_properties.push_back(std::move(edges)); + } + return true; }; } else { // TODO(gvolfing) - do we want to set the action_successful here? - get_edge_properties = [&req](const memgraph::storage::v3::EdgeAccessor &edge) { - std::vector<memgraph::msgs::Value> ret; - ret.reserve(req.edge_properties.value().size()); + edge_filler = [&req](const EdgeAccessor &edge, const bool is_in_edge, + msgs::ExpandOneResultRow &result_row) -> bool { + std::vector<msgs::Value> value_properties; + value_properties.reserve(req.edge_properties.value().size()); for (const auto &edge_prop : req.edge_properties.value()) { - // TODO(gvolfing) maybe check for the absence of certain properties - ret.emplace_back(FromPropertyValueToValue(edge.GetProperty(edge_prop, View::OLD).GetValue())); + auto property_result = edge.GetProperty(edge_prop, View::NEW); + if (property_result.HasError()) { + spdlog::debug("Encountered an error while trying to get edge properties. Transaction id: {}", + req.transaction_id.logical_id); + return false; + } + value_properties.emplace_back(FromPropertyValueToValue(std::move(property_result.GetValue()))); } - return ret; + using EdgeWithSpecificProperties = msgs::ExpandOneResultRow::EdgeWithSpecificProperties; + EdgeWithSpecificProperties edges{ToMsgsVertexId(edge.FromVertex()), msgs::EdgeType{edge.EdgeType()}, + edge.Gid().AsUint(), std::move(value_properties)}; + if (is_in_edge) { + result_row.in_edges_with_specific_properties.push_back(std::move(edges)); + } else { + result_row.out_edges_with_specific_properties.push_back(std::move(edges)); + } + return true; }; } /// Fill up source vertex - auto v_acc = acc.FindVertex(ConvertPropertyVector(std::move(src_vertex.second)), View::OLD); + const auto primary_key = ConvertPropertyVector(std::move(src_vertex.second)); + auto v_acc = acc.FindVertex(primary_key, View::NEW); auto source_vertex = FillUpSourceVertex(v_acc, req, src_vertex); if (!source_vertex) { @@ -570,55 +544,19 @@ std::optional<memgraph::msgs::ExpandOneResultRow> GetExpandOneResult(memgraph::s auto [in_edges, out_edges] = fill_up_connecting_edges.value(); - /// Assemble the edge properties - std::optional<AllEdgePropertiesVector> edges_with_all_properties; - std::optional<SpecificEdgePropertiesVector> edges_with_specific_properties; - - if (!req.edge_properties) { - auto ret_in_opt = GetEdgesWithProperties<AllEdgePropertiesVector, AllEdgeProperties, AllEdgePropertyDataSructure>( - in_edges, req, get_edge_properties); - if (!ret_in_opt) { - return std::nullopt; - } - - auto ret_out_opt = GetEdgesWithProperties<AllEdgePropertiesVector, AllEdgeProperties, AllEdgePropertyDataSructure>( - out_edges, req, get_edge_properties); - if (!ret_out_opt) { - return std::nullopt; - } - - auto &ret_in = *ret_in_opt; - auto &ret_out = *ret_out_opt; - - SetFinalEdgeProperties<AllEdgePropertiesVector, SpecificEdgePropertiesVector>( - edges_with_all_properties, edges_with_specific_properties, ret_out, ret_in, req); - } else { - auto ret_in_opt = - GetEdgesWithProperties<SpecificEdgePropertiesVector, SpecificEdgeProperties, SpecificEdgePropertyDataSructure>( - in_edges, req, get_edge_properties); - if (!ret_in_opt) { - return std::nullopt; - } - - auto ret_out_opt = - GetEdgesWithProperties<SpecificEdgePropertiesVector, SpecificEdgeProperties, SpecificEdgePropertyDataSructure>( - out_edges, req, get_edge_properties); - if (!ret_out_opt) { - return std::nullopt; - } - - auto &ret_in = *ret_in_opt; - auto &ret_out = *ret_out_opt; - - SetFinalEdgeProperties<SpecificEdgePropertiesVector, AllEdgePropertiesVector>( - edges_with_specific_properties, edges_with_all_properties, ret_out, ret_in, req); + msgs::ExpandOneResultRow result_row; + result_row.src_vertex = std::move(*source_vertex); + result_row.src_vertex_properties = std::move(*src_vertex_properties); + static constexpr bool kInEdges = true; + static constexpr bool kOutEdges = false; + if (!in_edges.empty() && !FillEdges<kInEdges>(in_edges, req, result_row, edge_filler)) { + return std::nullopt; + } + if (!out_edges.empty() && !FillEdges<kOutEdges>(out_edges, req, result_row, edge_filler)) { + return std::nullopt; } - return memgraph::msgs::ExpandOneResultRow{ - .src_vertex = std::move(*source_vertex), - .src_vertex_properties = std::move(src_vertex_properties), - .edges_with_all_properties = std::move(edges_with_all_properties), - .edges_with_specific_properties = std::move(edges_with_specific_properties)}; + return result_row; } }; // namespace msgs::WriteResponses ShardRsm::ApplyWrite(msgs::CreateVerticesRequest &&req) { @@ -637,7 +575,7 @@ msgs::WriteResponses ShardRsm::ApplyWrite(msgs::CreateVerticesRequest &&req) { auto converted_property_map = ConvertPropertyMap(std::move(new_vertex.properties)); // TODO(gvolfing) make sure if this conversion is actually needed. - std::vector<memgraph::storage::v3::LabelId> converted_label_ids; + std::vector<LabelId> converted_label_ids; converted_label_ids.reserve(new_vertex.label_ids.size()); std::transform(new_vertex.label_ids.begin(), new_vertex.label_ids.end(), std::back_inserter(converted_label_ids), @@ -671,7 +609,7 @@ msgs::WriteResponses ShardRsm::ApplyWrite(msgs::CreateVerticesRequest &&req) { } } - return memgraph::msgs::CreateVerticesResponse{.success = action_successful}; + return msgs::CreateVerticesResponse{.success = action_successful}; } msgs::WriteResponses ShardRsm::ApplyWrite(msgs::UpdateVerticesRequest &&req) { @@ -720,7 +658,7 @@ msgs::WriteResponses ShardRsm::ApplyWrite(msgs::UpdateVerticesRequest &&req) { } } - return memgraph::msgs::UpdateVerticesResponse{.success = action_successful}; + return msgs::UpdateVerticesResponse{.success = action_successful}; } msgs::WriteResponses ShardRsm::ApplyWrite(msgs::DeleteVerticesRequest &&req) { @@ -743,7 +681,7 @@ msgs::WriteResponses ShardRsm::ApplyWrite(msgs::DeleteVerticesRequest &&req) { // Since we will not have different kinds of deletion types in one transaction, // we dont have to enter the switch statement on every iteration. Optimize this. switch (req.deletion_type) { - case memgraph::msgs::DeleteVerticesRequest::DeletionType::DELETE: { + case msgs::DeleteVerticesRequest::DeletionType::DELETE: { auto result = acc.DeleteVertex(&vertex_acc.value()); if (result.HasError() || !(result.GetValue().has_value())) { action_successful = false; @@ -752,7 +690,7 @@ msgs::WriteResponses ShardRsm::ApplyWrite(msgs::DeleteVerticesRequest &&req) { break; } - case memgraph::msgs::DeleteVerticesRequest::DeletionType::DETACH_DELETE: { + case msgs::DeleteVerticesRequest::DeletionType::DETACH_DELETE: { auto result = acc.DetachDeleteVertex(&vertex_acc.value()); if (result.HasError() || !(result.GetValue().has_value())) { action_successful = false; @@ -766,7 +704,7 @@ msgs::WriteResponses ShardRsm::ApplyWrite(msgs::DeleteVerticesRequest &&req) { } } - return memgraph::msgs::DeleteVerticesResponse{.success = action_successful}; + return msgs::DeleteVerticesResponse{.success = action_successful}; } msgs::WriteResponses ShardRsm::ApplyWrite(msgs::CreateExpandRequest &&req) { @@ -774,25 +712,20 @@ msgs::WriteResponses ShardRsm::ApplyWrite(msgs::CreateExpandRequest &&req) { bool action_successful = true; for (auto &new_expand : req.new_expands) { - auto vertex_acc_from_primary_key = new_expand.src_vertex.second; - auto vertex_from_acc = acc.FindVertex(ConvertPropertyVector(std::move(vertex_acc_from_primary_key)), View::OLD); + const auto from_vertex_id = + v3::VertexId{new_expand.src_vertex.first.id, ConvertPropertyVector(std::move(new_expand.src_vertex.second))}; - auto vertex_acc_to_primary_key = new_expand.dest_vertex.second; - auto vertex_to_acc = acc.FindVertex(ConvertPropertyVector(std::move(vertex_acc_to_primary_key)), View::OLD); + const auto to_vertex_id = + VertexId{new_expand.dest_vertex.first.id, ConvertPropertyVector(std::move(new_expand.dest_vertex.second))}; - if (!(vertex_from_acc || vertex_to_acc)) { + if (!(shard_->IsVertexBelongToShard(from_vertex_id) || shard_->IsVertexBelongToShard(to_vertex_id))) { action_successful = false; - spdlog::debug("Error while trying to insert edge, vertex does not exist. Transaction id: {}", + spdlog::debug("Error while trying to insert edge, none of the vertices belong to this shard. Transaction id: {}", req.transaction_id.logical_id); break; } - auto from_vertex_id = - VertexId(new_expand.src_vertex.first.id, ConvertPropertyVector(std::move(new_expand.src_vertex.second))); - auto to_vertex_id = - VertexId(new_expand.dest_vertex.first.id, ConvertPropertyVector(std::move(new_expand.dest_vertex.second))); - auto edge_acc = acc.CreateEdge(from_vertex_id, to_vertex_id, EdgeTypeId::FromUint(new_expand.type.id), - Gid::FromUint(new_expand.id.gid)); + auto edge_acc = acc.CreateEdge(from_vertex_id, to_vertex_id, new_expand.type.id, Gid::FromUint(new_expand.id.gid)); if (edge_acc.HasValue()) { auto edge = edge_acc.GetValue(); if (!new_expand.properties.empty()) { @@ -828,7 +761,7 @@ msgs::WriteResponses ShardRsm::ApplyWrite(msgs::CreateExpandRequest &&req) { } } - return memgraph::msgs::CreateExpandResponse{.success = action_successful}; + return msgs::CreateExpandResponse{.success = action_successful}; } msgs::WriteResponses ShardRsm::ApplyWrite(msgs::DeleteEdgesRequest &&req) { @@ -850,10 +783,11 @@ msgs::WriteResponses ShardRsm::ApplyWrite(msgs::DeleteEdgesRequest &&req) { } } - return memgraph::msgs::DeleteEdgesResponse{.success = action_successful}; + return msgs::DeleteEdgesResponse{.success = action_successful}; } msgs::WriteResponses ShardRsm::ApplyWrite(msgs::UpdateEdgesRequest &&req) { + // TODO(antaljanosbenjamin): handle when the vertex is the destination vertex auto acc = shard_->Access(req.transaction_id); bool action_successful = true; @@ -908,15 +842,15 @@ msgs::WriteResponses ShardRsm::ApplyWrite(msgs::UpdateEdgesRequest &&req) { } } - return memgraph::msgs::UpdateEdgesResponse{.success = action_successful}; + return msgs::UpdateEdgesResponse{.success = action_successful}; } msgs::ReadResponses ShardRsm::HandleRead(msgs::ScanVerticesRequest &&req) { auto acc = shard_->Access(req.transaction_id); bool action_successful = true; - std::vector<memgraph::msgs::ScanResultRow> results; - std::optional<memgraph::msgs::VertexId> next_start_id; + std::vector<msgs::ScanResultRow> results; + std::optional<msgs::VertexId> next_start_id; const auto view = View(req.storage_view); auto vertex_iterable = acc.Vertices(view); @@ -988,7 +922,7 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::ScanVerticesRequest &&req) { } } - memgraph::msgs::ScanVerticesResponse resp{}; + msgs::ScanVerticesResponse resp{}; resp.success = action_successful; if (action_successful) { @@ -1003,7 +937,7 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::ExpandOneRequest &&req) { auto acc = shard_->Access(req.transaction_id); bool action_successful = true; - std::vector<memgraph::msgs::ExpandOneResultRow> results; + std::vector<msgs::ExpandOneResultRow> results; for (auto &src_vertex : req.src_vertices) { auto result = GetExpandOneResult(acc, src_vertex, req); @@ -1016,7 +950,8 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::ExpandOneRequest &&req) { results.emplace_back(result.value()); } - memgraph::msgs::ExpandOneResponse resp{}; + msgs::ExpandOneResponse resp{}; + resp.success = action_successful; if (action_successful) { resp.result = std::move(results); } @@ -1026,12 +961,12 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::ExpandOneRequest &&req) { msgs::WriteResponses ShardRsm::ApplyWrite(msgs::CommitRequest &&req) { shard_->Access(req.transaction_id).Commit(req.commit_timestamp); - return memgraph::msgs::CommitResponse{true}; + return msgs::CommitResponse{true}; }; // NOLINTNEXTLINE(readability-convert-member-functions-to-static) msgs::ReadResponses ShardRsm::HandleRead(msgs::GetPropertiesRequest && /*req*/) { - return memgraph::msgs::GetPropertiesResponse{}; + return msgs::GetPropertiesResponse{}; } } // namespace memgraph::storage::v3 diff --git a/src/storage/v3/value_conversions.hpp b/src/storage/v3/value_conversions.hpp index 3787f7c98..5649a6dde 100644 --- a/src/storage/v3/value_conversions.hpp +++ b/src/storage/v3/value_conversions.hpp @@ -10,6 +10,7 @@ // licenses/APL.txt. #include "query/v2/requests.hpp" #include "storage/v3/property_value.hpp" +#include "storage/v3/vertex_id.hpp" #include "utils/logging.hpp" #include <map> @@ -29,8 +30,8 @@ using memgraph::msgs::Value; using memgraph::msgs::VertexId; // TODO(gvolfing use come algorithm instead of explicit for loops) -inline memgraph::storage::v3::PropertyValue ToPropertyValue(Value value) { - using PV = memgraph::storage::v3::PropertyValue; +inline v3::PropertyValue ToPropertyValue(Value value) { + using PV = v3::PropertyValue; PV ret; switch (value.type) { case Value::Type::Null: @@ -65,7 +66,7 @@ inline memgraph::storage::v3::PropertyValue ToPropertyValue(Value value) { return ret; } -inline Value FromPropertyValueToValue(const memgraph::storage::v3::PropertyValue &pv) { +inline Value FromPropertyValueToValue(memgraph::storage::v3::PropertyValue &&pv) { using memgraph::storage::v3::PropertyValue; switch (pv.type()) { @@ -78,17 +79,17 @@ inline Value FromPropertyValueToValue(const memgraph::storage::v3::PropertyValue case PropertyValue::Type::List: { std::vector<Value> list; list.reserve(pv.ValueList().size()); - for (const auto &elem : pv.ValueList()) { - list.emplace_back(FromPropertyValueToValue(elem)); + for (auto &elem : pv.ValueList()) { + list.emplace_back(FromPropertyValueToValue(std::move(elem))); } return Value(list); } case PropertyValue::Type::Map: { std::map<std::string, Value> map; - for (const auto &[key, val] : pv.ValueMap()) { + for (auto &[key, val] : pv.ValueMap()) { // maybe use std::make_pair once the && issue is resolved. - map.emplace(key, FromPropertyValueToValue(val)); + map.emplace(key, FromPropertyValueToValue(std::move(val))); } return Value(map); @@ -96,7 +97,7 @@ inline Value FromPropertyValueToValue(const memgraph::storage::v3::PropertyValue case PropertyValue::Type::Null: return Value{}; case PropertyValue::Type::String: - return Value(pv.ValueString()); + return Value(std::move(pv.ValueString())); case PropertyValue::Type::TemporalData: { // TBD -> we need to specify this in the messages, not a priority. MG_ASSERT(false, "Temporal datatypes are not yet implemented on Value!"); @@ -105,8 +106,8 @@ inline Value FromPropertyValueToValue(const memgraph::storage::v3::PropertyValue } } -inline std::vector<memgraph::storage::v3::PropertyValue> ConvertPropertyVector(std::vector<Value> vec) { - std::vector<memgraph::storage::v3::PropertyValue> ret; +inline std::vector<v3::PropertyValue> ConvertPropertyVector(std::vector<Value> vec) { + std::vector<v3::PropertyValue> ret; ret.reserve(vec.size()); for (auto &elem : vec) { @@ -116,15 +117,18 @@ inline std::vector<memgraph::storage::v3::PropertyValue> ConvertPropertyVector(s return ret; } -inline std::vector<Value> ConvertValueVector(const std::vector<memgraph::storage::v3::PropertyValue> &vec) { +inline std::vector<Value> ConvertValueVector(const std::vector<v3::PropertyValue> &vec) { std::vector<Value> ret; ret.reserve(vec.size()); for (const auto &elem : vec) { - ret.push_back(FromPropertyValueToValue(elem)); + ret.push_back(FromPropertyValueToValue(v3::PropertyValue{elem})); } return ret; } +inline msgs::VertexId ToMsgsVertexId(const v3::VertexId &vertex_id) { + return {msgs::Label{vertex_id.primary_label}, ConvertValueVector(vertex_id.primary_key)}; +} } // namespace memgraph::storage::conversions diff --git a/src/storage/v3/vertex_accessor.cpp b/src/storage/v3/vertex_accessor.cpp index 7eae92b0a..9d4e94c68 100644 --- a/src/storage/v3/vertex_accessor.cpp +++ b/src/storage/v3/vertex_accessor.cpp @@ -453,6 +453,7 @@ Result<std::map<PropertyId, PropertyValue>> VertexAccessor::Properties(View view Delta *delta = nullptr; { deleted = vertex_->deleted; + // TODO(antaljanosbenjamin): This should also return the primary key properties = vertex_->properties.Properties(); delta = vertex_->delta; } diff --git a/tests/e2e/distributed_queries/distributed_queries.py b/tests/e2e/distributed_queries/distributed_queries.py index aafd0e13d..68ebdef8d 100644 --- a/tests/e2e/distributed_queries/distributed_queries.py +++ b/tests/e2e/distributed_queries/distributed_queries.py @@ -48,15 +48,28 @@ def test_vertex_creation_and_scanall(connection): wait_for_shard_manager_to_initialize() cursor = connection.cursor() - assert has_n_result_row(cursor, "CREATE (n :label {property:1, asd:2})", 0) - assert has_n_result_row(cursor, "CREATE (n :label {property:2, asd:2})", 0) - assert has_n_result_row(cursor, "CREATE (n :label {property:3, asd:2})", 0) - assert has_n_result_row(cursor, "CREATE (n :label {property:4, asd:2})", 0) - assert has_n_result_row(cursor, "CREATE (n :label {property:5, asd:2})", 0) + assert has_n_result_row(cursor, "CREATE (n :label {property:1})", 0) + assert has_n_result_row(cursor, "CREATE (n :label {property:2})", 0) + assert has_n_result_row(cursor, "CREATE (n :label {property:3})", 0) - assert has_n_result_row(cursor, "MATCH (n) RETURN n", 5) - assert has_n_result_row(cursor, "MATCH (n) RETURN *", 5) - assert has_n_result_row(cursor, "MATCH (n :label) RETURN *", 5) + assert has_n_result_row(cursor, "MATCH (n) RETURN n", 3) + assert has_n_result_row(cursor, "MATCH (n) RETURN *", 3) + assert has_n_result_row(cursor, "MATCH (n :label) RETURN *", 3) + + assert has_n_result_row(cursor, "MATCH (n), (m) CREATE (n)-[:TO]->(m)", 0) + + results = execute_and_fetch_all(cursor, "MATCH (n)-[r]->(m) RETURN n,r,m") + assert len(results) == 9 + for (n, r, m) in results: + n_props = n.properties + assert len(n_props) == 0, "n is not expected to have properties, update the test!" + assert len(n.labels) == 0, "n is not expected to have labels, update the test!" + + assert r.type == "TO" + + m_props = m.properties + assert m_props["property"] <= 3 and m_props["property"] >= 0, "Wrong key" + assert len(m.labels) == 0, "m is not expected to have labels, update the test!" if __name__ == "__main__": diff --git a/tests/simulation/common.hpp b/tests/simulation/common.hpp index 9dd21536d..a73bf37a5 100644 --- a/tests/simulation/common.hpp +++ b/tests/simulation/common.hpp @@ -43,26 +43,18 @@ #include "storage/v3/value_conversions.hpp" #include "utils/logging.hpp" -using memgraph::coordinator::Hlc; -using memgraph::io::rsm::StorageWriteRequest; -using memgraph::io::rsm::StorageWriteResponse; -using memgraph::io::simulator::Simulator; -using memgraph::io::simulator::SimulatorConfig; -using memgraph::io::simulator::SimulatorStats; -using memgraph::io::simulator::SimulatorTransport; -using memgraph::msgs::CreateVerticesRequest; -using memgraph::msgs::CreateVerticesResponse; -using memgraph::msgs::ExpandOneRequest; -using memgraph::msgs::ExpandOneResponse; -using memgraph::msgs::ListedValues; -using memgraph::msgs::ScanVerticesRequest; -using memgraph::msgs::ScanVerticesResponse; -using memgraph::msgs::Value; -using memgraph::msgs::VertexId; -using memgraph::storage::v3::LabelId; -using memgraph::storage::v3::PropertyValue; +namespace memgraph::storage::v3::tests { +using coordinator::Hlc; +using io::rsm::StorageWriteRequest; +using io::rsm::StorageWriteResponse; +using io::simulator::Simulator; +using io::simulator::SimulatorConfig; +using io::simulator::SimulatorStats; +using io::simulator::SimulatorTransport; +using storage::v3::LabelId; +using storage::v3::PropertyValue; -using ShardRsmKey = std::vector<memgraph::storage::v3::PropertyValue>; +using ShardRsmKey = std::vector<storage::v3::PropertyValue>; class MockedShardRsm { std::map<ShardRsmKey, int> state_; @@ -79,32 +71,37 @@ class MockedShardRsm { } public: + using ReadRequests = msgs::ReadRequests; + using ReadResponses = msgs::ReadResponses; + using WriteRequests = msgs::WriteRequests; + using WriteResponses = msgs::WriteResponses; + // ExpandOneResponse Read(ExpandOneRequest rqst); // GetPropertiesResponse Read(GetPropertiesRequest rqst); - ScanVerticesResponse ReadImpl(ScanVerticesRequest rqst) { - ScanVerticesResponse ret; - auto as_prop_val = memgraph::storage::conversions::ConvertPropertyVector(rqst.start_id.second); + msgs::ScanVerticesResponse ReadImpl(msgs::ScanVerticesRequest rqst) { + msgs::ScanVerticesResponse ret; + auto as_prop_val = storage::conversions::ConvertPropertyVector(rqst.start_id.second); if (!IsKeyInRange(as_prop_val)) { ret.success = false; } else if (as_prop_val == ShardRsmKey{PropertyValue(0), PropertyValue(0)}) { - Value val(int64_t(0)); - ret.next_start_id = std::make_optional<VertexId>(); + msgs::Value val(int64_t(0)); + ret.next_start_id = std::make_optional<msgs::VertexId>(); ret.next_start_id->second = - memgraph::storage::conversions::ConvertValueVector(ShardRsmKey{PropertyValue(1), PropertyValue(0)}); - memgraph::msgs::ScanResultRow result; - result.props.push_back(std::make_pair(memgraph::msgs::PropertyId::FromUint(0), val)); + storage::conversions::ConvertValueVector(ShardRsmKey{PropertyValue(1), PropertyValue(0)}); + msgs::ScanResultRow result; + result.props.push_back(std::make_pair(msgs::PropertyId::FromUint(0), val)); ret.results.push_back(std::move(result)); ret.success = true; } else if (as_prop_val == ShardRsmKey{PropertyValue(1), PropertyValue(0)}) { - memgraph::msgs::ScanResultRow result; - Value val(int64_t(1)); - result.props.push_back(std::make_pair(memgraph::msgs::PropertyId::FromUint(0), val)); + msgs::ScanResultRow result; + msgs::Value val(int64_t(1)); + result.props.push_back(std::make_pair(msgs::PropertyId::FromUint(0), val)); ret.results.push_back(std::move(result)); ret.success = true; } else if (as_prop_val == ShardRsmKey{PropertyValue(12), PropertyValue(13)}) { - memgraph::msgs::ScanResultRow result; - Value val(int64_t(444)); - result.props.push_back(std::make_pair(memgraph::msgs::PropertyId::FromUint(0), val)); + msgs::ScanResultRow result; + msgs::Value val(int64_t(444)); + result.props.push_back(std::make_pair(msgs::PropertyId::FromUint(0), val)); ret.results.push_back(std::move(result)); ret.success = true; } else { @@ -113,14 +110,25 @@ class MockedShardRsm { return ret; } - ExpandOneResponse ReadImpl(ExpandOneRequest rqst) { return {}; } - using ReadRequests = std::variant<ScanVerticesRequest, ExpandOneRequest>; - using ReadResponses = std::variant<ScanVerticesResponse, ExpandOneResponse>; + msgs::ExpandOneResponse ReadImpl(msgs::ExpandOneRequest rqst) { return {}; } + msgs::ExpandOneResponse ReadImpl(msgs::GetPropertiesRequest rqst) { return {}; } ReadResponses Read(ReadRequests read_requests) { - return {std::visit([this](auto &&request) { return ReadResponses{ReadImpl(std::move(request))}; }, + return {std::visit([this]<typename T>(T &&request) { return ReadResponses{ReadImpl(std::forward<T>(request))}; }, std::move(read_requests))}; } - CreateVerticesResponse Apply(CreateVerticesRequest request) { return CreateVerticesResponse{.success = true}; } + msgs::CreateVerticesResponse ApplyImpl(msgs::CreateVerticesRequest rqst) { return {.success = true}; } + msgs::DeleteVerticesResponse ApplyImpl(msgs::DeleteVerticesRequest rqst) { return {}; } + msgs::UpdateVerticesResponse ApplyImpl(msgs::UpdateVerticesRequest rqst) { return {}; } + msgs::CreateExpandResponse ApplyImpl(msgs::CreateExpandRequest rqst) { return {.success = true}; } + msgs::DeleteEdgesResponse ApplyImpl(msgs::DeleteEdgesRequest rqst) { return {}; } + msgs::UpdateEdgesResponse ApplyImpl(msgs::UpdateEdgesRequest rqst) { return {}; } + msgs::CommitResponse ApplyImpl(msgs::CommitRequest rqst) { return {}; } + + WriteResponses Apply(WriteRequests write_requests) { + return {std::visit([this]<typename T>(T &&request) { return WriteResponses{ApplyImpl(std::forward<T>(request))}; }, + std::move(write_requests))}; + } }; +} // namespace memgraph::storage::v3::tests diff --git a/tests/simulation/shard_request_manager.cpp b/tests/simulation/shard_request_manager.cpp index fca13a514..746ab385f 100644 --- a/tests/simulation/shard_request_manager.cpp +++ b/tests/simulation/shard_request_manager.cpp @@ -36,51 +36,51 @@ #include "storage/v3/property_value.hpp" #include "utils/result.hpp" -using memgraph::coordinator::AddressAndStatus; -using CompoundKey = memgraph::coordinator::PrimaryKey; -using memgraph::coordinator::Coordinator; -using memgraph::coordinator::CoordinatorClient; -using memgraph::coordinator::CoordinatorRsm; -using memgraph::coordinator::HlcRequest; -using memgraph::coordinator::HlcResponse; -using memgraph::coordinator::Shard; -using memgraph::coordinator::ShardMap; -using memgraph::coordinator::Shards; -using memgraph::coordinator::Status; -using memgraph::io::Address; -using memgraph::io::Io; -using memgraph::io::ResponseEnvelope; -using memgraph::io::ResponseFuture; -using memgraph::io::Time; -using memgraph::io::TimedOut; -using memgraph::io::rsm::Raft; -using memgraph::io::rsm::ReadRequest; -using memgraph::io::rsm::ReadResponse; -using memgraph::io::rsm::StorageReadRequest; -using memgraph::io::rsm::StorageReadResponse; -using memgraph::io::rsm::StorageWriteRequest; -using memgraph::io::rsm::StorageWriteResponse; -using memgraph::io::rsm::WriteRequest; -using memgraph::io::rsm::WriteResponse; -using memgraph::io::simulator::Simulator; -using memgraph::io::simulator::SimulatorConfig; -using memgraph::io::simulator::SimulatorStats; -using memgraph::io::simulator::SimulatorTransport; -using memgraph::msgs::CreateVerticesRequest; -using memgraph::msgs::CreateVerticesResponse; -using memgraph::msgs::ListedValues; -using memgraph::msgs::NewVertexLabel; -using memgraph::msgs::ScanVerticesRequest; -using memgraph::msgs::ScanVerticesResponse; -using memgraph::storage::v3::LabelId; -using memgraph::storage::v3::SchemaProperty; -using memgraph::utils::BasicResult; +namespace memgraph::query::v2::tests { +using coordinator::AddressAndStatus; +using CompoundKey = coordinator::PrimaryKey; +using coordinator::Coordinator; +using coordinator::CoordinatorClient; +using coordinator::CoordinatorRsm; +using coordinator::HlcRequest; +using coordinator::HlcResponse; +using coordinator::Shard; +using coordinator::ShardMap; +using coordinator::Shards; +using coordinator::Status; +using io::Address; +using io::Io; +using io::ResponseEnvelope; +using io::ResponseFuture; +using io::Time; +using io::TimedOut; +using io::rsm::Raft; +using io::rsm::ReadRequest; +using io::rsm::ReadResponse; +using io::rsm::StorageReadRequest; +using io::rsm::StorageReadResponse; +using io::rsm::StorageWriteRequest; +using io::rsm::StorageWriteResponse; +using io::rsm::WriteRequest; +using io::rsm::WriteResponse; +using io::simulator::Simulator; +using io::simulator::SimulatorConfig; +using io::simulator::SimulatorStats; +using io::simulator::SimulatorTransport; +using msgs::CreateVerticesRequest; +using msgs::CreateVerticesResponse; +using msgs::ScanVerticesRequest; +using msgs::ScanVerticesResponse; +using msgs::VertexId; +using storage::v3::LabelId; +using storage::v3::SchemaProperty; +using storage::v3::tests::MockedShardRsm; +using utils::BasicResult; namespace { -ShardMap CreateDummyShardmap(memgraph::coordinator::Address a_io_1, memgraph::coordinator::Address a_io_2, - memgraph::coordinator::Address a_io_3, memgraph::coordinator::Address b_io_1, - memgraph::coordinator::Address b_io_2, memgraph::coordinator::Address b_io_3) { +ShardMap CreateDummyShardmap(coordinator::Address a_io_1, coordinator::Address a_io_2, coordinator::Address a_io_3, + coordinator::Address b_io_1, coordinator::Address b_io_2, coordinator::Address b_io_3) { static const std::string label_name = std::string("test_label"); ShardMap sm; @@ -89,8 +89,8 @@ ShardMap CreateDummyShardmap(memgraph::coordinator::Address a_io_1, memgraph::co const auto properties = sm.AllocatePropertyIds(property_names); const auto property_id_1 = properties.at("property_1"); const auto property_id_2 = properties.at("property_2"); - const auto type_1 = memgraph::common::SchemaType::INT; - const auto type_2 = memgraph::common::SchemaType::INT; + const auto type_1 = common::SchemaType::INT; + const auto type_2 = common::SchemaType::INT; // register new label space std::vector<SchemaProperty> schema = { @@ -113,8 +113,8 @@ ShardMap CreateDummyShardmap(memgraph::coordinator::Address a_io_1, memgraph::co Shard shard1 = {aas1_1, aas1_2, aas1_3}; - auto key1 = memgraph::storage::v3::PropertyValue(0); - auto key2 = memgraph::storage::v3::PropertyValue(0); + auto key1 = storage::v3::PropertyValue(0); + auto key2 = storage::v3::PropertyValue(0); CompoundKey compound_key_1 = {key1, key2}; shards_for_label[compound_key_1] = shard1; @@ -125,20 +125,22 @@ ShardMap CreateDummyShardmap(memgraph::coordinator::Address a_io_1, memgraph::co Shard shard2 = {aas2_1, aas2_2, aas2_3}; - auto key3 = memgraph::storage::v3::PropertyValue(12); - auto key4 = memgraph::storage::v3::PropertyValue(13); + auto key3 = storage::v3::PropertyValue(12); + auto key4 = storage::v3::PropertyValue(13); CompoundKey compound_key_2 = {key3, key4}; shards_for_label[compound_key_2] = shard2; + sm.AllocateEdgeTypeIds(std::vector<coordinator::EdgeTypeName>{"edge_type"}); + return sm; } } // namespace -using WriteRequests = CreateVerticesRequest; -using WriteResponses = CreateVerticesResponse; -using ReadRequests = std::variant<ScanVerticesRequest, ExpandOneRequest>; -using ReadResponses = std::variant<ScanVerticesResponse, ExpandOneResponse>; +using WriteRequests = msgs::WriteRequests; +using WriteResponses = msgs::WriteResponses; +using ReadRequests = msgs::ReadRequests; +using ReadResponses = msgs::ReadResponses; using ConcreteCoordinatorRsm = CoordinatorRsm<SimulatorTransport>; using ConcreteStorageRsm = @@ -149,40 +151,34 @@ void RunStorageRaft(Raft<IoImpl, MockedShardRsm, WriteRequests, WriteResponses, server.Run(); } -template <typename ShardRequestManager> -void TestScanAll(ShardRequestManager &io) { - memgraph::msgs::ExecutionState<ScanVerticesRequest> state{.label = "test_label"}; +void TestScanVertices(msgs::ShardRequestManagerInterface &io) { + msgs::ExecutionState<ScanVerticesRequest> state{.label = "test_label"}; auto result = io.Request(state); MG_ASSERT(result.size() == 2); { - auto prop = result[0].GetProperty(memgraph::msgs::PropertyId::FromUint(0)); + auto prop = result[0].GetProperty(msgs::PropertyId::FromUint(0)); MG_ASSERT(prop.int_v == 0); - prop = result[1].GetProperty(memgraph::msgs::PropertyId::FromUint(0)); + prop = result[1].GetProperty(msgs::PropertyId::FromUint(0)); MG_ASSERT(prop.int_v == 444); } result = io.Request(state); { MG_ASSERT(result.size() == 1); - auto prop = result[0].GetProperty(memgraph::msgs::PropertyId::FromUint(0)); + auto prop = result[0].GetProperty(msgs::PropertyId::FromUint(0)); MG_ASSERT(prop.int_v == 1); } - - // Exhaust it, request should be empty - result = io.Request(state); - MG_ASSERT(result.size() == 0); } -template <typename ShardRequestManager> -void TestCreateVertices(ShardRequestManager &io) { - using PropVal = memgraph::msgs::Value; - memgraph::msgs::ExecutionState<CreateVerticesRequest> state; - std::vector<memgraph::msgs::NewVertex> new_vertices; - auto label_id = io.LabelNameToLabelId("test_label"); - memgraph::msgs::NewVertex a1{.primary_key = {PropVal(int64_t(1)), PropVal(int64_t(0))}}; +void TestCreateVertices(msgs::ShardRequestManagerInterface &io) { + using PropVal = msgs::Value; + msgs::ExecutionState<CreateVerticesRequest> state; + std::vector<msgs::NewVertex> new_vertices; + auto label_id = io.NameToLabel("test_label"); + msgs::NewVertex a1{.primary_key = {PropVal(int64_t(1)), PropVal(int64_t(0))}}; a1.label_ids.push_back({label_id}); - memgraph::msgs::NewVertex a2{.primary_key = {PropVal(int64_t(13)), PropVal(int64_t(13))}}; + msgs::NewVertex a2{.primary_key = {PropVal(int64_t(13)), PropVal(int64_t(13))}}; a2.label_ids.push_back({label_id}); new_vertices.push_back(std::move(a1)); new_vertices.push_back(std::move(a2)); @@ -191,151 +187,176 @@ void TestCreateVertices(ShardRequestManager &io) { MG_ASSERT(result.size() == 2); } -template <typename ShardRequestManager> -void TestCreateExpand(ShardRequestManager &io) { - using PropVal = memgraph::msgs::Value; - memgraph::msgs::ExecutionState<memgraph::msgs::CreateExpandRequest> state; - std::vector<memgraph::msgs::NewExpand> new_expands; +void TestCreateExpand(msgs::ShardRequestManagerInterface &io) { + using PropVal = msgs::Value; + msgs::ExecutionState<msgs::CreateExpandRequest> state; + std::vector<msgs::NewExpand> new_expands; const auto edge_type_id = io.NameToEdgeType("edge_type"); - const auto label_id = io.LabelNameToLabelId("test_label"); - const VertexId vertex_id_1{label_id, {PropVal(int64_t(1)), PropVal(int64_t(0))}}; - const VertexId vertex_id_2{label_id, {PropVal(int64_t(13)), PropVal(int64_t(13))}}; - memgraph::msgs::NewExpand expand_1{ - .id = 0, .type = edge_type_id, .src_vertex = vertex_id_1, .dest_vertex = vertex_id_2}; - memgraph::msgs::NewExpand expand_2{ - .id = 1, .type = edge_type_id, .src_vertex = vertex_id_2, .dest_vertex = vertex_id_1}; + const auto label = msgs::Label{io.NameToLabel("test_label")}; + const msgs::VertexId vertex_id_1{label, {PropVal(int64_t(0)), PropVal(int64_t(0))}}; + const msgs::VertexId vertex_id_2{label, {PropVal(int64_t(13)), PropVal(int64_t(13))}}; + msgs::NewExpand expand_1{ + .id = {.gid = 0}, .type = {edge_type_id}, .src_vertex = vertex_id_1, .dest_vertex = vertex_id_2}; + msgs::NewExpand expand_2{ + .id = {.gid = 1}, .type = {edge_type_id}, .src_vertex = vertex_id_2, .dest_vertex = vertex_id_1}; new_expands.push_back(std::move(expand_1)); + new_expands.push_back(std::move(expand_2)); - auto result = io.Request(state, std::move(new_expands)); - MG_ASSERT(result.size() == 2); + auto responses = io.Request(state, std::move(new_expands)); + MG_ASSERT(responses.size() == 2); + MG_ASSERT(responses[0].success); + MG_ASSERT(responses[1].success); } -template <typename ShardRequestManager> -void TestExpand(ShardRequestManager &io) {} +void TestExpandOne(msgs::ShardRequestManagerInterface &shard_request_manager) { + msgs::ExecutionState<msgs::ExpandOneRequest> state{}; + msgs::ExpandOneRequest request; + const auto edge_type_id = shard_request_manager.NameToEdgeType("edge_type"); + const auto label = msgs::Label{shard_request_manager.NameToLabel("test_label")}; + request.src_vertices.push_back(msgs::VertexId{label, {msgs::Value(int64_t(0)), msgs::Value(int64_t(0))}}); + request.edge_types.push_back(msgs::EdgeType{edge_type_id}); + request.direction = msgs::EdgeDirection::BOTH; + auto result_rows = shard_request_manager.Request(state, std::move(request)); + MG_ASSERT(result_rows.size() == 2); +} template <typename ShardRequestManager> void TestAggregate(ShardRequestManager &io) {} -int main() { - // SimulatorConfig config{ - // .drop_percent = 0, - // .perform_timeouts = false, - // .scramble_messages = false, - // .rng_seed = 0, - // .start_time = Time::min() + std::chrono::microseconds{256 * 1024}, - // .abort_time = Time::min() + std::chrono::microseconds{2 * 8 * 1024 * 1024}, - // }; +void DoTest() { + SimulatorConfig config{ + .drop_percent = 0, + .perform_timeouts = false, + .scramble_messages = false, + .rng_seed = 0, + .start_time = Time::min() + std::chrono::microseconds{256 * 1024}, + .abort_time = Time::min() + std::chrono::microseconds{2 * 8 * 1024 * 1024}, + }; - // auto simulator = Simulator(config); - // const auto one_second = std::chrono::seconds(1); + auto simulator = Simulator(config); + const auto one_second = std::chrono::seconds(1); - // Io<SimulatorTransport> cli_io = simulator.RegisterNew(); - // cli_io.SetDefaultTimeout(one_second); + Io<SimulatorTransport> cli_io = simulator.RegisterNew(); + cli_io.SetDefaultTimeout(one_second); - // // Register - // Io<SimulatorTransport> a_io_1 = simulator.RegisterNew(); - // a_io_1.SetDefaultTimeout(one_second); - // Io<SimulatorTransport> a_io_2 = simulator.RegisterNew(); - // a_io_2.SetDefaultTimeout(one_second); - // Io<SimulatorTransport> a_io_3 = simulator.RegisterNew(); - // a_io_3.SetDefaultTimeout(one_second); + // Register + Io<SimulatorTransport> a_io_1 = simulator.RegisterNew(); + a_io_1.SetDefaultTimeout(one_second); + Io<SimulatorTransport> a_io_2 = simulator.RegisterNew(); + a_io_2.SetDefaultTimeout(one_second); + Io<SimulatorTransport> a_io_3 = simulator.RegisterNew(); + a_io_3.SetDefaultTimeout(one_second); - // Io<SimulatorTransport> b_io_1 = simulator.RegisterNew(); - // b_io_1.SetDefaultTimeout(one_second); - // Io<SimulatorTransport> b_io_2 = simulator.RegisterNew(); - // b_io_2.SetDefaultTimeout(one_second); - // Io<SimulatorTransport> b_io_3 = simulator.RegisterNew(); - // b_io_3.SetDefaultTimeout(one_second); + Io<SimulatorTransport> b_io_1 = simulator.RegisterNew(); + b_io_1.SetDefaultTimeout(one_second); + Io<SimulatorTransport> b_io_2 = simulator.RegisterNew(); + b_io_2.SetDefaultTimeout(one_second); + Io<SimulatorTransport> b_io_3 = simulator.RegisterNew(); + b_io_3.SetDefaultTimeout(one_second); - // // Preconfigure coordinator with kv shard 'A' and 'B' - // auto sm1 = CreateDummyShardmap(a_io_1.GetAddress(), a_io_2.GetAddress(), a_io_3.GetAddress(), b_io_1.GetAddress(), - // b_io_2.GetAddress(), b_io_3.GetAddress()); - // auto sm2 = CreateDummyShardmap(a_io_1.GetAddress(), a_io_2.GetAddress(), a_io_3.GetAddress(), b_io_1.GetAddress(), - // b_io_2.GetAddress(), b_io_3.GetAddress()); - // auto sm3 = CreateDummyShardmap(a_io_1.GetAddress(), a_io_2.GetAddress(), a_io_3.GetAddress(), b_io_1.GetAddress(), - // b_io_2.GetAddress(), b_io_3.GetAddress()); + // Preconfigure coordinator with kv shard 'A' and 'B' + auto sm1 = CreateDummyShardmap(a_io_1.GetAddress(), a_io_2.GetAddress(), a_io_3.GetAddress(), b_io_1.GetAddress(), + b_io_2.GetAddress(), b_io_3.GetAddress()); + auto sm2 = CreateDummyShardmap(a_io_1.GetAddress(), a_io_2.GetAddress(), a_io_3.GetAddress(), b_io_1.GetAddress(), + b_io_2.GetAddress(), b_io_3.GetAddress()); + auto sm3 = CreateDummyShardmap(a_io_1.GetAddress(), a_io_2.GetAddress(), a_io_3.GetAddress(), b_io_1.GetAddress(), + b_io_2.GetAddress(), b_io_3.GetAddress()); - // // Spin up shard A - // std::vector<Address> a_addrs = {a_io_1.GetAddress(), a_io_2.GetAddress(), a_io_3.GetAddress()}; + // Spin up shard A + std::vector<Address> a_addrs = {a_io_1.GetAddress(), a_io_2.GetAddress(), a_io_3.GetAddress()}; - // std::vector<Address> a_1_peers = {a_addrs[1], a_addrs[2]}; - // std::vector<Address> a_2_peers = {a_addrs[0], a_addrs[2]}; - // std::vector<Address> a_3_peers = {a_addrs[0], a_addrs[1]}; + std::vector<Address> a_1_peers = {a_addrs[1], a_addrs[2]}; + std::vector<Address> a_2_peers = {a_addrs[0], a_addrs[2]}; + std::vector<Address> a_3_peers = {a_addrs[0], a_addrs[1]}; - // ConcreteStorageRsm a_1{std::move(a_io_1), a_1_peers, MockedShardRsm{}}; - // ConcreteStorageRsm a_2{std::move(a_io_2), a_2_peers, MockedShardRsm{}}; - // ConcreteStorageRsm a_3{std::move(a_io_3), a_3_peers, MockedShardRsm{}}; + ConcreteStorageRsm a_1{std::move(a_io_1), a_1_peers, MockedShardRsm{}}; + ConcreteStorageRsm a_2{std::move(a_io_2), a_2_peers, MockedShardRsm{}}; + ConcreteStorageRsm a_3{std::move(a_io_3), a_3_peers, MockedShardRsm{}}; - // auto a_thread_1 = std::jthread(RunStorageRaft<SimulatorTransport>, std::move(a_1)); - // simulator.IncrementServerCountAndWaitForQuiescentState(a_addrs[0]); + auto a_thread_1 = std::jthread(RunStorageRaft<SimulatorTransport>, std::move(a_1)); + simulator.IncrementServerCountAndWaitForQuiescentState(a_addrs[0]); - // auto a_thread_2 = std::jthread(RunStorageRaft<SimulatorTransport>, std::move(a_2)); - // simulator.IncrementServerCountAndWaitForQuiescentState(a_addrs[1]); + auto a_thread_2 = std::jthread(RunStorageRaft<SimulatorTransport>, std::move(a_2)); + simulator.IncrementServerCountAndWaitForQuiescentState(a_addrs[1]); - // auto a_thread_3 = std::jthread(RunStorageRaft<SimulatorTransport>, std::move(a_3)); - // simulator.IncrementServerCountAndWaitForQuiescentState(a_addrs[2]); + auto a_thread_3 = std::jthread(RunStorageRaft<SimulatorTransport>, std::move(a_3)); + simulator.IncrementServerCountAndWaitForQuiescentState(a_addrs[2]); - // // Spin up shard B - // std::vector<Address> b_addrs = {b_io_1.GetAddress(), b_io_2.GetAddress(), b_io_3.GetAddress()}; + // Spin up shard B + std::vector<Address> b_addrs = {b_io_1.GetAddress(), b_io_2.GetAddress(), b_io_3.GetAddress()}; - // std::vector<Address> b_1_peers = {b_addrs[1], b_addrs[2]}; - // std::vector<Address> b_2_peers = {b_addrs[0], b_addrs[2]}; - // std::vector<Address> b_3_peers = {b_addrs[0], b_addrs[1]}; + std::vector<Address> b_1_peers = {b_addrs[1], b_addrs[2]}; + std::vector<Address> b_2_peers = {b_addrs[0], b_addrs[2]}; + std::vector<Address> b_3_peers = {b_addrs[0], b_addrs[1]}; - // ConcreteStorageRsm b_1{std::move(b_io_1), b_1_peers, MockedShardRsm{}}; - // ConcreteStorageRsm b_2{std::move(b_io_2), b_2_peers, MockedShardRsm{}}; - // ConcreteStorageRsm b_3{std::move(b_io_3), b_3_peers, MockedShardRsm{}}; + ConcreteStorageRsm b_1{std::move(b_io_1), b_1_peers, MockedShardRsm{}}; + ConcreteStorageRsm b_2{std::move(b_io_2), b_2_peers, MockedShardRsm{}}; + ConcreteStorageRsm b_3{std::move(b_io_3), b_3_peers, MockedShardRsm{}}; - // auto b_thread_1 = std::jthread(RunStorageRaft<SimulatorTransport>, std::move(b_1)); - // simulator.IncrementServerCountAndWaitForQuiescentState(b_addrs[0]); + auto b_thread_1 = std::jthread(RunStorageRaft<SimulatorTransport>, std::move(b_1)); + simulator.IncrementServerCountAndWaitForQuiescentState(b_addrs[0]); - // auto b_thread_2 = std::jthread(RunStorageRaft<SimulatorTransport>, std::move(b_2)); - // simulator.IncrementServerCountAndWaitForQuiescentState(b_addrs[1]); + auto b_thread_2 = std::jthread(RunStorageRaft<SimulatorTransport>, std::move(b_2)); + simulator.IncrementServerCountAndWaitForQuiescentState(b_addrs[1]); - // auto b_thread_3 = std::jthread(RunStorageRaft<SimulatorTransport>, std::move(b_3)); - // simulator.IncrementServerCountAndWaitForQuiescentState(b_addrs[2]); + auto b_thread_3 = std::jthread(RunStorageRaft<SimulatorTransport>, std::move(b_3)); + simulator.IncrementServerCountAndWaitForQuiescentState(b_addrs[2]); - // // Spin up coordinators + // Spin up coordinators - // Io<SimulatorTransport> c_io_1 = simulator.RegisterNew(); - // c_io_1.SetDefaultTimeout(one_second); - // Io<SimulatorTransport> c_io_2 = simulator.RegisterNew(); - // c_io_2.SetDefaultTimeout(one_second); - // Io<SimulatorTransport> c_io_3 = simulator.RegisterNew(); - // c_io_3.SetDefaultTimeout(one_second); + Io<SimulatorTransport> c_io_1 = simulator.RegisterNew(); + c_io_1.SetDefaultTimeout(one_second); + Io<SimulatorTransport> c_io_2 = simulator.RegisterNew(); + c_io_2.SetDefaultTimeout(one_second); + Io<SimulatorTransport> c_io_3 = simulator.RegisterNew(); + c_io_3.SetDefaultTimeout(one_second); - // std::vector<Address> c_addrs = {c_io_1.GetAddress(), c_io_2.GetAddress(), c_io_3.GetAddress()}; + std::vector<Address> c_addrs = {c_io_1.GetAddress(), c_io_2.GetAddress(), c_io_3.GetAddress()}; - // std::vector<Address> c_1_peers = {c_addrs[1], c_addrs[2]}; - // std::vector<Address> c_2_peers = {c_addrs[0], c_addrs[2]}; - // std::vector<Address> c_3_peers = {c_addrs[0], c_addrs[1]}; + std::vector<Address> c_1_peers = {c_addrs[1], c_addrs[2]}; + std::vector<Address> c_2_peers = {c_addrs[0], c_addrs[2]}; + std::vector<Address> c_3_peers = {c_addrs[0], c_addrs[1]}; - // ConcreteCoordinatorRsm c_1{std::move(c_io_1), c_1_peers, Coordinator{(sm1)}}; - // ConcreteCoordinatorRsm c_2{std::move(c_io_2), c_2_peers, Coordinator{(sm2)}}; - // ConcreteCoordinatorRsm c_3{std::move(c_io_3), c_3_peers, Coordinator{(sm3)}}; + ConcreteCoordinatorRsm c_1{std::move(c_io_1), c_1_peers, Coordinator{(sm1)}}; + ConcreteCoordinatorRsm c_2{std::move(c_io_2), c_2_peers, Coordinator{(sm2)}}; + ConcreteCoordinatorRsm c_3{std::move(c_io_3), c_3_peers, Coordinator{(sm3)}}; - // auto c_thread_1 = std::jthread([c_1]() mutable { c_1.Run(); }); - // simulator.IncrementServerCountAndWaitForQuiescentState(c_addrs[0]); + auto c_thread_1 = std::jthread([c_1]() mutable { c_1.Run(); }); + simulator.IncrementServerCountAndWaitForQuiescentState(c_addrs[0]); - // auto c_thread_2 = std::jthread([c_2]() mutable { c_2.Run(); }); - // simulator.IncrementServerCountAndWaitForQuiescentState(c_addrs[1]); + auto c_thread_2 = std::jthread([c_2]() mutable { c_2.Run(); }); + simulator.IncrementServerCountAndWaitForQuiescentState(c_addrs[1]); - // auto c_thread_3 = std::jthread([c_3]() mutable { c_3.Run(); }); - // simulator.IncrementServerCountAndWaitForQuiescentState(c_addrs[2]); + auto c_thread_3 = std::jthread([c_3]() mutable { c_3.Run(); }); + simulator.IncrementServerCountAndWaitForQuiescentState(c_addrs[2]); - // std::cout << "beginning test after servers have become quiescent" << std::endl; + std::cout << "beginning test after servers have become quiescent" << std::endl; - // // Have client contact coordinator RSM for a new transaction ID and - // // also get the current shard map - // CoordinatorClient<SimulatorTransport> coordinator_client(cli_io, c_addrs[0], c_addrs); + // Have client contact coordinator RSM for a new transaction ID and + // also get the current shard map + CoordinatorClient<SimulatorTransport> coordinator_client(cli_io, c_addrs[0], c_addrs); - // memgraph::msgs::ShardRequestManager<SimulatorTransport> io(std::move(coordinator_client), std::move(cli_io)); + msgs::ShardRequestManager<SimulatorTransport> io(std::move(coordinator_client), std::move(cli_io)); - // io.StartTransaction(); - // TestScanAll(io); - // TestCreateVertices(io); + io.StartTransaction(); + TestScanVertices(io); + TestCreateVertices(io); + TestCreateExpand(io); - // simulator.ShutDown(); - return 0; + simulator.ShutDown(); + + SimulatorStats stats = simulator.Stats(); + + std::cout << "total messages: " << stats.total_messages << std::endl; + std::cout << "dropped messages: " << stats.dropped_messages << std::endl; + std::cout << "timed out requests: " << stats.timed_out_requests << std::endl; + std::cout << "total requests: " << stats.total_requests << std::endl; + std::cout << "total responses: " << stats.total_responses << std::endl; + std::cout << "simulator ticks: " << stats.simulator_ticks << std::endl; + + std::cout << "========================== SUCCESS :) ==========================" << std::endl; } +} // namespace memgraph::query::v2::tests + +int main() { memgraph::query::v2::tests::DoTest(); } diff --git a/tests/simulation/shard_rsm.cpp b/tests/simulation/shard_rsm.cpp index d7d359ce2..953e0579a 100644 --- a/tests/simulation/shard_rsm.cpp +++ b/tests/simulation/shard_rsm.cpp @@ -136,6 +136,7 @@ void Commit(ShardClient &client, const coordinator::Hlc &transaction_timestamp) auto write_response_result = write_res.GetValue(); auto write_response = std::get<msgs::CommitResponse>(write_response_result); + MG_ASSERT(write_response.success, "Commit expected to be successful, but it is failed"); break; } @@ -210,7 +211,7 @@ bool AttemptToUpdateVertex(ShardClient &client, int64_t value) { } bool AttemptToAddEdge(ShardClient &client, int64_t value_of_vertex_1, int64_t value_of_vertex_2, int64_t edge_gid, - int64_t edge_type_id) { + EdgeTypeId edge_type_id) { auto id = msgs::EdgeId{}; msgs::Label label = {.id = get_primary_label()}; @@ -249,7 +250,7 @@ bool AttemptToAddEdge(ShardClient &client, int64_t value_of_vertex_1, int64_t va bool AttemptToAddEdgeWithProperties(ShardClient &client, int64_t value_of_vertex_1, int64_t value_of_vertex_2, int64_t edge_gid, uint64_t edge_prop_id, int64_t edge_prop_val, - const std::vector<uint64_t> &edge_type_id) { + const std::vector<EdgeTypeId> &edge_type_id) { msgs::EdgeId id1; msgs::Label label = {.id = get_primary_label()}; @@ -282,7 +283,7 @@ bool AttemptToAddEdgeWithProperties(ShardClient &client, int64_t value_of_vertex } bool AttemptToDeleteEdge(ShardClient &client, int64_t value_of_vertex_1, int64_t value_of_vertex_2, int64_t edge_gid, - int64_t edge_type_id) { + EdgeTypeId edge_type_id) { auto id = msgs::EdgeId{}; msgs::Label label = {.id = get_primary_label()}; @@ -319,7 +320,7 @@ bool AttemptToDeleteEdge(ShardClient &client, int64_t value_of_vertex_1, int64_t } bool AttemptToUpdateEdge(ShardClient &client, int64_t value_of_vertex_1, int64_t value_of_vertex_2, int64_t edge_gid, - int64_t edge_type_id, uint64_t edge_prop_id, int64_t edge_prop_val) { + EdgeTypeId edge_type_id, uint64_t edge_prop_id, int64_t edge_prop_val) { auto id = msgs::EdgeId{}; msgs::Label label = {.id = get_primary_label()}; @@ -338,7 +339,7 @@ bool AttemptToUpdateEdge(ShardClient &client, int64_t value_of_vertex_1, int64_t auto edge_prop = std::vector<std::pair<PropertyId, msgs::Value>>{ std::make_pair(PropertyId::FromUint(edge_prop_id), msgs::Value(edge_prop_val))}; - msgs::UpdateEdgeProp update_props{.src = src, .dst = dst, .edge_id = id, .property_updates = edge_prop}; + msgs::UpdateEdgeProp update_props{.edge_id = id, .src = src, .dst = dst, .property_updates = edge_prop}; msgs::UpdateEdgesRequest update_req{}; update_req.transaction_id.logical_id = GetTransactionId(); @@ -444,61 +445,7 @@ std::tuple<size_t, std::optional<msgs::VertexId>> AttemptToScanAllWithExpression } } -void AttemptToExpandOneWithWrongEdgeType(ShardClient &client, uint64_t src_vertex_val, uint64_t edge_type_id) { - // Source vertex - msgs::Label label = {.id = get_primary_label()}; - auto src_vertex = std::make_pair(label, GetPrimaryKey(src_vertex_val)); - - // Edge type - auto edge_type = msgs::EdgeType{}; - edge_type.id = edge_type_id + 1; - - // Edge direction - auto edge_direction = msgs::EdgeDirection::OUT; - - // Source Vertex properties to look for - std::optional<std::vector<PropertyId>> src_vertex_properties = {}; - - // Edge properties to look for - std::optional<std::vector<PropertyId>> edge_properties = {}; - - std::vector<msgs::Expression> expressions; - std::optional<std::vector<msgs::OrderBy>> order_by = {}; - std::optional<size_t> limit = {}; - std::optional<msgs::Filter> filter = {}; - - msgs::ExpandOneRequest expand_one_req{}; - - expand_one_req.direction = edge_direction; - expand_one_req.edge_properties = edge_properties; - expand_one_req.edge_types = {edge_type}; - expand_one_req.expressions = expressions; - expand_one_req.filter = filter; - expand_one_req.limit = limit; - expand_one_req.order_by = order_by; - expand_one_req.src_vertex_properties = src_vertex_properties; - expand_one_req.src_vertices = {src_vertex}; - expand_one_req.transaction_id.logical_id = GetTransactionId(); - - while (true) { - auto read_res = client.SendReadRequest(expand_one_req); - if (read_res.HasError()) { - continue; - } - - auto write_response_result = read_res.GetValue(); - auto write_response = std::get<msgs::ExpandOneResponse>(write_response_result); - MG_ASSERT(write_response.result.size() == 1); - - MG_ASSERT(write_response.result[0].edges_with_all_properties); - MG_ASSERT(write_response.result[0].edges_with_all_properties->size() == 0); - MG_ASSERT(!write_response.result[0].edges_with_specific_properties); - - break; - } -} - -void AttemptToExpandOneSimple(ShardClient &client, uint64_t src_vertex_val, uint64_t edge_type_id) { +void AttemptToExpandOneWithWrongEdgeType(ShardClient &client, uint64_t src_vertex_val, EdgeTypeId edge_type_id) { // Source vertex msgs::Label label = {.id = get_primary_label()}; auto src_vertex = std::make_pair(label, GetPrimaryKey(src_vertex_val)); @@ -543,17 +490,74 @@ void AttemptToExpandOneSimple(ShardClient &client, uint64_t src_vertex_val, uint auto write_response_result = read_res.GetValue(); auto write_response = std::get<msgs::ExpandOneResponse>(write_response_result); MG_ASSERT(write_response.result.size() == 1); - MG_ASSERT(write_response.result[0].edges_with_all_properties->size() == 2); - auto number_of_properties_on_edge = - (std::get<std::map<PropertyId, msgs::Value>>(write_response.result[0].edges_with_all_properties.value()[0])) - .size(); + + MG_ASSERT(write_response.result[0].in_edges_with_all_properties.empty()); + MG_ASSERT(write_response.result[0].out_edges_with_all_properties.empty()); + MG_ASSERT(write_response.result[0].in_edges_with_specific_properties.empty()); + MG_ASSERT(write_response.result[0].out_edges_with_specific_properties.empty()); + + break; + } +} + +void AttemptToExpandOneSimple(ShardClient &client, uint64_t src_vertex_val, EdgeTypeId edge_type_id) { + // Source vertex + msgs::Label label = {.id = get_primary_label()}; + auto src_vertex = std::make_pair(label, GetPrimaryKey(src_vertex_val)); + + // Edge type + auto edge_type = msgs::EdgeType{}; + edge_type.id = edge_type_id; + + // Edge direction + auto edge_direction = msgs::EdgeDirection::OUT; + + // Source Vertex properties to look for + std::optional<std::vector<PropertyId>> src_vertex_properties = {}; + + // Edge properties to look for + std::optional<std::vector<PropertyId>> edge_properties = {}; + + std::vector<msgs::Expression> expressions; + std::optional<std::vector<msgs::OrderBy>> order_by = {}; + std::optional<size_t> limit = {}; + std::optional<msgs::Filter> filter = {}; + + msgs::ExpandOneRequest expand_one_req{}; + + expand_one_req.direction = edge_direction; + expand_one_req.edge_properties = edge_properties; + expand_one_req.edge_types = {edge_type}; + expand_one_req.expressions = expressions; + expand_one_req.filter = filter; + expand_one_req.limit = limit; + expand_one_req.order_by = order_by; + expand_one_req.src_vertex_properties = src_vertex_properties; + expand_one_req.src_vertices = {src_vertex}; + expand_one_req.transaction_id.logical_id = GetTransactionId(); + + while (true) { + auto read_res = client.SendReadRequest(expand_one_req); + if (read_res.HasError()) { + continue; + } + + auto write_response_result = read_res.GetValue(); + auto write_response = std::get<msgs::ExpandOneResponse>(write_response_result); + MG_ASSERT(write_response.result.size() == 1); + MG_ASSERT(write_response.result[0].out_edges_with_all_properties.size() == 2); + MG_ASSERT(write_response.result[0].in_edges_with_all_properties.empty()); + MG_ASSERT(write_response.result[0].in_edges_with_specific_properties.empty()); + MG_ASSERT(write_response.result[0].out_edges_with_specific_properties.empty()); + const auto number_of_properties_on_edge = + (write_response.result[0].out_edges_with_all_properties[0]).properties.size(); MG_ASSERT(number_of_properties_on_edge == 1); break; } } void AttemptToExpandOneWithSpecifiedSrcVertexProperties(ShardClient &client, uint64_t src_vertex_val, - uint64_t edge_type_id) { + EdgeTypeId edge_type_id) { // Source vertex msgs::Label label = {.id = get_primary_label()}; auto src_vertex = std::make_pair(label, GetPrimaryKey(src_vertex_val)); @@ -599,19 +603,21 @@ void AttemptToExpandOneWithSpecifiedSrcVertexProperties(ShardClient &client, uin auto write_response_result = read_res.GetValue(); auto write_response = std::get<msgs::ExpandOneResponse>(write_response_result); MG_ASSERT(write_response.result.size() == 1); - auto src_vertex_props_size = write_response.result[0].src_vertex_properties->size(); + auto src_vertex_props_size = write_response.result[0].src_vertex_properties.size(); MG_ASSERT(src_vertex_props_size == 1); - MG_ASSERT(write_response.result[0].edges_with_all_properties->size() == 2); - auto number_of_properties_on_edge = - (std::get<std::map<PropertyId, msgs::Value>>(write_response.result[0].edges_with_all_properties.value()[0])) - .size(); + MG_ASSERT(write_response.result[0].out_edges_with_all_properties.size() == 2); + MG_ASSERT(write_response.result[0].in_edges_with_all_properties.empty()); + MG_ASSERT(write_response.result[0].in_edges_with_specific_properties.empty()); + MG_ASSERT(write_response.result[0].out_edges_with_specific_properties.empty()); + const auto number_of_properties_on_edge = + (write_response.result[0].out_edges_with_all_properties[0]).properties.size(); MG_ASSERT(number_of_properties_on_edge == 1); break; } } -void AttemptToExpandOneWithSpecifiedEdgeProperties(ShardClient &client, uint64_t src_vertex_val, uint64_t edge_type_id, - uint64_t edge_prop_id) { +void AttemptToExpandOneWithSpecifiedEdgeProperties(ShardClient &client, uint64_t src_vertex_val, + EdgeTypeId edge_type_id, uint64_t edge_prop_id) { // Source vertex msgs::Label label = {.id = get_primary_label()}; auto src_vertex = std::make_pair(label, GetPrimaryKey(src_vertex_val)); @@ -657,9 +663,13 @@ void AttemptToExpandOneWithSpecifiedEdgeProperties(ShardClient &client, uint64_t auto write_response_result = read_res.GetValue(); auto write_response = std::get<msgs::ExpandOneResponse>(write_response_result); MG_ASSERT(write_response.result.size() == 1); - auto specific_properties_size = - (std::get<std::vector<msgs::Value>>(write_response.result[0].edges_with_specific_properties.value()[0])); - MG_ASSERT(specific_properties_size.size() == 1); + MG_ASSERT(write_response.result[0].out_edges_with_specific_properties.size() == 2); + MG_ASSERT(write_response.result[0].in_edges_with_specific_properties.empty()); + MG_ASSERT(write_response.result[0].in_edges_with_all_properties.empty()); + MG_ASSERT(write_response.result[0].out_edges_with_all_properties.empty()); + const auto specific_properties_size = + (write_response.result[0].out_edges_with_specific_properties[0]).properties.size(); + MG_ASSERT(specific_properties_size == 1); break; } } @@ -693,7 +703,7 @@ void TestCreateEdge(ShardClient &client) { MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val_2)); auto edge_gid = GetUniqueInteger(); - auto edge_type_id = GetUniqueInteger(); + auto edge_type_id = EdgeTypeId::FromUint(GetUniqueInteger()); MG_ASSERT(AttemptToAddEdge(client, unique_prop_val_1, unique_prop_val_2, edge_gid, edge_type_id)); } @@ -707,7 +717,7 @@ void TestCreateAndDeleteEdge(ShardClient &client) { MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val_2)); auto edge_gid = GetUniqueInteger(); - auto edge_type_id = GetUniqueInteger(); + auto edge_type_id = EdgeTypeId::FromUint(GetUniqueInteger()); MG_ASSERT(AttemptToAddEdge(client, unique_prop_val_1, unique_prop_val_2, edge_gid, edge_type_id)); @@ -724,7 +734,7 @@ void TestUpdateEdge(ShardClient &client) { MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val_2)); auto edge_gid = GetUniqueInteger(); - auto edge_type_id = GetUniqueInteger(); + auto edge_type_id = EdgeTypeId::FromUint(GetUniqueInteger()); auto edge_prop_id = GetUniqueInteger(); auto edge_prop_val_old = GetUniqueInteger(); @@ -819,7 +829,8 @@ void TestExpandOne(ShardClient &client) { MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val_2)); MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val_3)); - auto edge_type_id = GetUniqueInteger(); + auto edge_type_id = EdgeTypeId::FromUint(GetUniqueInteger()); + auto wrong_edge_type_id = EdgeTypeId::FromUint(GetUniqueInteger()); auto edge_gid_1 = GetUniqueInteger(); auto edge_gid_2 = GetUniqueInteger(); @@ -835,7 +846,7 @@ void TestExpandOne(ShardClient &client) { edge_prop_val, {edge_type_id})); AttemptToExpandOneSimple(client, unique_prop_val_1, edge_type_id); - AttemptToExpandOneWithWrongEdgeType(client, unique_prop_val_1, edge_type_id); + AttemptToExpandOneWithWrongEdgeType(client, unique_prop_val_1, wrong_edge_type_id); AttemptToExpandOneWithSpecifiedSrcVertexProperties(client, unique_prop_val_1, edge_type_id); AttemptToExpandOneWithSpecifiedEdgeProperties(client, unique_prop_val_1, edge_type_id, edge_prop_id); } diff --git a/tests/unit/machine_manager.cpp b/tests/unit/machine_manager.cpp index 630beedbf..99e18a961 100644 --- a/tests/unit/machine_manager.cpp +++ b/tests/unit/machine_manager.cpp @@ -86,6 +86,7 @@ ShardMap TestShardMap() { const auto label_id = sm.InitializeNewLabel(kLabelName, schema, replication_factor, sm.shard_map_version); EXPECT_TRUE(label_id.has_value()); + sm.AllocateEdgeTypeIds(std::vector<std::string>{"edge_type"}); // split the shard at N split points // NB: this is the logic that should be provided by the "split file" // TODO(tyler) split points should account for signedness @@ -116,12 +117,11 @@ void TestScanAll(ShardRequestManager &shard_request_manager) { EXPECT_EQ(result.size(), 2); } -template <typename ShardRequestManager> -void TestCreateVertices(ShardRequestManager &shard_request_manager) { +void TestCreateVertices(msgs::ShardRequestManagerInterface &shard_request_manager) { using PropVal = msgs::Value; msgs::ExecutionState<msgs::CreateVerticesRequest> state; std::vector<msgs::NewVertex> new_vertices; - auto label_id = shard_request_manager.LabelNameToLabelId(kLabelName); + auto label_id = shard_request_manager.NameToLabel(kLabelName); msgs::NewVertex a1{.primary_key = {PropVal(int64_t(0)), PropVal(int64_t(0))}}; a1.label_ids.push_back({label_id}); msgs::NewVertex a2{.primary_key = {PropVal(int64_t(13)), PropVal(int64_t(13))}}; @@ -133,8 +133,40 @@ void TestCreateVertices(ShardRequestManager &shard_request_manager) { EXPECT_EQ(result.size(), 1); } -template <typename ShardRequestManager> -void TestExpand(ShardRequestManager &shard_request_manager) {} +void TestCreateExpand(msgs::ShardRequestManagerInterface &shard_request_manager) { + using PropVal = msgs::Value; + msgs::ExecutionState<msgs::CreateExpandRequest> state; + std::vector<msgs::NewExpand> new_expands; + + const auto edge_type_id = shard_request_manager.NameToEdgeType("edge_type"); + const auto label = msgs::Label{shard_request_manager.NameToLabel("test_label")}; + const msgs::VertexId vertex_id_1{label, {PropVal(int64_t(0)), PropVal(int64_t(0))}}; + const msgs::VertexId vertex_id_2{label, {PropVal(int64_t(13)), PropVal(int64_t(13))}}; + msgs::NewExpand expand_1{ + .id = {.gid = 0}, .type = {edge_type_id}, .src_vertex = vertex_id_1, .dest_vertex = vertex_id_2}; + msgs::NewExpand expand_2{ + .id = {.gid = 1}, .type = {edge_type_id}, .src_vertex = vertex_id_2, .dest_vertex = vertex_id_1}; + new_expands.push_back(std::move(expand_1)); + new_expands.push_back(std::move(expand_2)); + + auto responses = shard_request_manager.Request(state, std::move(new_expands)); + MG_ASSERT(responses.size() == 1); + MG_ASSERT(responses[0].success); +} + +void TestExpandOne(msgs::ShardRequestManagerInterface &shard_request_manager) { + msgs::ExecutionState<msgs::ExpandOneRequest> state{}; + msgs::ExpandOneRequest request; + const auto edge_type_id = shard_request_manager.NameToEdgeType("edge_type"); + const auto label = msgs::Label{shard_request_manager.NameToLabel("test_label")}; + request.src_vertices.push_back(msgs::VertexId{label, {msgs::Value(int64_t(0)), msgs::Value(int64_t(0))}}); + request.edge_types.push_back(msgs::EdgeType{edge_type_id}); + request.direction = msgs::EdgeDirection::BOTH; + auto result_rows = shard_request_manager.Request(state, std::move(request)); + MG_ASSERT(result_rows.size() == 1); + MG_ASSERT(result_rows[0].in_edges_with_all_properties.size() == 1); + MG_ASSERT(result_rows[0].out_edges_with_all_properties.size() == 1); +} template <typename ShardRequestManager> void TestAggregate(ShardRequestManager &shard_request_manager) {} @@ -198,6 +230,8 @@ TEST(MachineManager, BasicFunctionality) { shard_request_manager.StartTransaction(); TestCreateVertices(shard_request_manager); TestScanAll(shard_request_manager); + TestCreateExpand(shard_request_manager); + TestExpandOne(shard_request_manager); local_system.ShutDown(); };