From 59c7d81ae85216d8251183960f0b1bb57ebd1b17 Mon Sep 17 00:00:00 2001 From: Kostas Kyrimis Date: Mon, 31 Oct 2022 11:52:20 +0200 Subject: [PATCH] [:pineapple: < T1086-MG] Test distributed operators e2e (#607) * Fix Explain queries * Add Vertex/Edge accessor support for properties * Fix projections * Fix expansions to fetch destination vertex properties * Fix improper use of ShardMap on bolt and replaced it with the ShardRequestManager * Add NameToId mappers on ShardRequestManager * Add e2e tests for operators * Fix OPTIONAL MATCH --- .github/workflows/diff.yaml | 9 + src/coordinator/shard_map.cpp | 5 +- src/coordinator/shard_map.hpp | 3 +- src/expr/ast.hpp | 4 +- src/expr/interpret/eval.hpp | 4 +- src/glue/v2/communication.cpp | 46 +++-- src/glue/v2/communication.hpp | 36 ++-- src/memgraph.cpp | 25 ++- src/query/v2/accessors.cpp | 68 ++++--- src/query/v2/accessors.hpp | 15 +- src/query/v2/bindings/eval.hpp | 15 +- src/query/v2/conversions.hpp | 12 +- .../interpret/awesome_memgraph_functions.cpp | 2 +- .../interpret/awesome_memgraph_functions.hpp | 11 +- src/query/v2/interpreter.cpp | 58 +++--- src/query/v2/interpreter.hpp | 2 + src/query/v2/plan/operator.cpp | 170 +++++++++++------- src/query/v2/requests.hpp | 1 - src/query/v2/shard_request_manager.hpp | 37 +++- src/storage/v3/name_id_mapper.hpp | 4 + src/storage/v3/shard_rsm.cpp | 52 ++++-- src/utils/print_helpers.hpp | 16 +- tests/e2e/distributed_queries/CMakeLists.txt | 5 + tests/e2e/distributed_queries/common.py | 44 +++++ tests/e2e/distributed_queries/distinct.py | 38 ++++ .../distributed_queries.py | 31 +--- .../e2e/distributed_queries/optional_match.py | 37 ++++ .../distributed_queries/order_by_and_limit.py | 44 +++++ .../e2e/distributed_queries/unwind_collect.py | 34 ++++ tests/e2e/distributed_queries/workloads.yaml | 20 +++ tests/setup.sh | 14 +- 31 files changed, 602 insertions(+), 260 deletions(-) create mode 100644 tests/e2e/distributed_queries/common.py create mode 100644 tests/e2e/distributed_queries/distinct.py create mode 100644 tests/e2e/distributed_queries/optional_match.py create mode 100644 tests/e2e/distributed_queries/order_by_and_limit.py create mode 100644 tests/e2e/distributed_queries/unwind_collect.py diff --git a/.github/workflows/diff.yaml b/.github/workflows/diff.yaml index 00aca7f5c..ef5cf2ee2 100644 --- a/.github/workflows/diff.yaml +++ b/.github/workflows/diff.yaml @@ -219,3 +219,12 @@ jobs: # Run simulation tests. cd build ctest -R memgraph__simulation --output-on-failure -j$THREADS + + - name: Run e2e tests + run: | + # TODO(gitbuda): Setup mgclient and pymgclient properly. + cd tests + ./setup.sh + source ve3/bin/activate + cd e2e + LD_LIBRARY_PATH=$LD_LIBRARY_PATH:../../libs/mgclient/lib python runner.py --workloads-root-directory ./distributed_queries diff --git a/src/coordinator/shard_map.cpp b/src/coordinator/shard_map.cpp index bd21dea3e..746f70368 100644 --- a/src/coordinator/shard_map.cpp +++ b/src/coordinator/shard_map.cpp @@ -9,6 +9,7 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. +#include #include #include @@ -365,7 +366,6 @@ std::optional ShardMap::GetLabelId(const std::string &label) const { if (const auto it = labels.find(label); it != labels.end()) { return it->second; } - return std::nullopt; } @@ -382,7 +382,6 @@ std::optional ShardMap::GetPropertyId(const std::string &property_na if (const auto it = properties.find(property_name); it != properties.end()) { return it->second; } - return std::nullopt; } @@ -399,7 +398,6 @@ std::optional ShardMap::GetEdgeTypeId(const std::string &edge_type) if (const auto it = edge_types.find(edge_type); it != edge_types.end()) { return it->second; } - return std::nullopt; } @@ -411,6 +409,7 @@ const std::string &ShardMap::GetEdgeTypeName(const EdgeTypeId property) const { } throw utils::BasicException("EdgeTypeId not found!"); } + Shards ShardMap::GetShardsForRange(const LabelName &label_name, const PrimaryKey &start_key, const PrimaryKey &end_key) const { MG_ASSERT(start_key <= end_key); diff --git a/src/coordinator/shard_map.hpp b/src/coordinator/shard_map.hpp index 63274aa76..d8b3e0f6a 100644 --- a/src/coordinator/shard_map.hpp +++ b/src/coordinator/shard_map.hpp @@ -25,6 +25,7 @@ #include "io/address.hpp" #include "storage/v3/config.hpp" #include "storage/v3/id_types.hpp" +#include "storage/v3/name_id_mapper.hpp" #include "storage/v3/property_value.hpp" #include "storage/v3/schemas.hpp" #include "storage/v3/temporal.hpp" @@ -120,9 +121,9 @@ struct ShardMap { std::map properties; std::map edge_types; uint64_t max_label_id{kNotExistingId}; - std::map labels; std::map label_spaces; std::map> schemas; + std::map labels; [[nodiscard]] static ShardMap Parse(std::istream &input_stream); friend std::ostream &operator<<(std::ostream &in, const ShardMap &shard_map); diff --git a/src/expr/ast.hpp b/src/expr/ast.hpp index 211d23ff4..070315a76 100644 --- a/src/expr/ast.hpp +++ b/src/expr/ast.hpp @@ -13,7 +13,7 @@ #ifndef MG_AST_INCLUDE_PATH #ifdef MG_CLANG_TIDY_CHECK // NOLINTNEXTLINE(cppcoreguidelines-macro-usage) -#define MG_AST_INCLUDE_PATH "query/v2/frontend/ast/ast.hpp" +#include "query/v2/bindings/bindings.hpp" #else #error Missing AST include path #endif @@ -21,8 +21,6 @@ #ifndef MG_INJECTED_NAMESPACE_NAME #ifdef MG_CLANG_TIDY_CHECK -// NOLINTNEXTLINE(cppcoreguidelines-macro-usage) -#define MG_INJECTED_NAMESPACE_NAME memgraph::query::v2 #else #error Missing AST namespace #endif diff --git a/src/expr/interpret/eval.hpp b/src/expr/interpret/eval.hpp index 253c2e86d..1538028a0 100644 --- a/src/expr/interpret/eval.hpp +++ b/src/expr/interpret/eval.hpp @@ -718,7 +718,7 @@ class ExpressionEvaluator : public ExpressionVisitor { TReturnType GetProperty(const TRecordAccessor &record_accessor, PropertyIx prop) { auto maybe_prop = record_accessor.GetProperty(prop.name); // Handler non existent property - return conv_(maybe_prop); + return conv_(maybe_prop, dba_); } template { TReturnType GetProperty(const TRecordAccessor &record_accessor, const std::string_view name) { auto maybe_prop = record_accessor.GetProperty(std::string(name)); // Handler non existent property - return conv_(maybe_prop); + return conv_(maybe_prop, dba_); } template ToBoltVertex(const query::v2::accessors::VertexAccessor &vertex, - const coordinator::ShardMap &shard_map, - storage::v3::View /*view*/) { +storage::v3::Result ToBoltVertex( + const query::v2::accessors::VertexAccessor &vertex, const msgs::ShardRequestManagerInterface *shard_request_manager, + storage::v3::View /*view*/) { auto id = communication::bolt::Id::FromUint(0); auto labels = vertex.Labels(); std::vector new_labels; new_labels.reserve(labels.size()); for (const auto &label : labels) { - new_labels.push_back(shard_map.GetLabelName(label.id)); + new_labels.push_back(shard_request_manager->LabelToName(label.id)); } auto properties = vertex.Properties(); std::map new_properties; for (const auto &[prop, property_value] : properties) { - new_properties[shard_map.GetPropertyName(prop)] = ToBoltValue(property_value); + new_properties[shard_request_manager->PropertyToName(prop)] = ToBoltValue(property_value); } return communication::bolt::Vertex{id, new_labels, new_properties}; } -storage::v3::Result ToBoltEdge(const query::v2::accessors::EdgeAccessor &edge, - const coordinator::ShardMap &shard_map, - storage::v3::View /*view*/) { +storage::v3::Result ToBoltEdge( + const query::v2::accessors::EdgeAccessor &edge, const msgs::ShardRequestManagerInterface *shard_request_manager, + storage::v3::View /*view*/) { // TODO(jbajic) Fix bolt communication auto id = communication::bolt::Id::FromUint(0); auto from = communication::bolt::Id::FromUint(0); auto to = communication::bolt::Id::FromUint(0); - const auto &type = shard_map.GetEdgeTypeName(edge.EdgeType()); + const auto &type = shard_request_manager->EdgeTypeToName(edge.EdgeType()); auto properties = edge.Properties(); std::map new_properties; for (const auto &[prop, property_value] : properties) { - new_properties[shard_map.GetPropertyName(prop)] = ToBoltValue(property_value); + new_properties[shard_request_manager->PropertyToName(prop)] = ToBoltValue(property_value); } return communication::bolt::Edge{id, from, to, type, new_properties}; } -storage::v3::Result ToBoltPath(const query::v2::accessors::Path & /*edge*/, - const coordinator::ShardMap & /*shard_map*/, - storage::v3::View /*view*/) { +storage::v3::Result ToBoltPath( + const query::v2::accessors::Path & /*edge*/, const msgs::ShardRequestManagerInterface * /*shard_request_manager*/, + storage::v3::View /*view*/) { // TODO(jbajic) Fix bolt communication return {storage::v3::Error::DELETED_OBJECT}; } -storage::v3::Result ToBoltValue(const query::v2::TypedValue &value, const coordinator::ShardMap &shard_map, +storage::v3::Result ToBoltValue(const query::v2::TypedValue &value, + const msgs::ShardRequestManagerInterface *shard_request_manager, storage::v3::View view) { switch (value.type()) { case query::v2::TypedValue::Type::Null: @@ -131,7 +133,7 @@ storage::v3::Result ToBoltValue(const query::v2::TypedValue &value, const std::vector values; values.reserve(value.ValueList().size()); for (const auto &v : value.ValueList()) { - auto maybe_value = ToBoltValue(v, shard_map, view); + auto maybe_value = ToBoltValue(v, shard_request_manager, view); if (maybe_value.HasError()) return maybe_value.GetError(); values.emplace_back(std::move(*maybe_value)); } @@ -140,24 +142,24 @@ storage::v3::Result ToBoltValue(const query::v2::TypedValue &value, const case query::v2::TypedValue::Type::Map: { std::map map; for (const auto &kv : value.ValueMap()) { - auto maybe_value = ToBoltValue(kv.second, shard_map, view); + auto maybe_value = ToBoltValue(kv.second, shard_request_manager, view); if (maybe_value.HasError()) return maybe_value.GetError(); map.emplace(kv.first, std::move(*maybe_value)); } return Value(std::move(map)); } case query::v2::TypedValue::Type::Vertex: { - auto maybe_vertex = ToBoltVertex(value.ValueVertex(), shard_map, view); + auto maybe_vertex = ToBoltVertex(value.ValueVertex(), shard_request_manager, view); if (maybe_vertex.HasError()) return maybe_vertex.GetError(); return Value(std::move(*maybe_vertex)); } case query::v2::TypedValue::Type::Edge: { - auto maybe_edge = ToBoltEdge(value.ValueEdge(), shard_map, view); + auto maybe_edge = ToBoltEdge(value.ValueEdge(), shard_request_manager, view); if (maybe_edge.HasError()) return maybe_edge.GetError(); return Value(std::move(*maybe_edge)); } case query::v2::TypedValue::Type::Path: { - auto maybe_path = ToBoltPath(value.ValuePath(), shard_map, view); + auto maybe_path = ToBoltPath(value.ValuePath(), shard_request_manager, view); if (maybe_path.HasError()) return maybe_path.GetError(); return Value(std::move(*maybe_path)); } @@ -209,12 +211,6 @@ Value ToBoltValue(msgs::Value value) { } } -storage::v3::Result ToBoltPath(const query::v2::accessors::Path & /*path*/, - const storage::v3::Shard & /*db*/, - storage::v3::View /*view*/) { - return communication::bolt::Path(); -} - storage::v3::PropertyValue ToPropertyValue(const Value &value) { switch (value.type()) { case Value::Type::Null: diff --git a/src/glue/v2/communication.hpp b/src/glue/v2/communication.hpp index 67a951c6f..ea9c6b4c9 100644 --- a/src/glue/v2/communication.hpp +++ b/src/glue/v2/communication.hpp @@ -15,6 +15,7 @@ #include "communication/bolt/v1/value.hpp" #include "coordinator/shard_map.hpp" #include "query/v2/bindings/typed_value.hpp" +#include "query/v2/shard_request_manager.hpp" #include "storage/v3/property_value.hpp" #include "storage/v3/result.hpp" #include "storage/v3/shard.hpp" @@ -30,40 +31,40 @@ namespace memgraph::glue::v2 { /// @param storage::v3::VertexAccessor for converting to /// communication::bolt::Vertex. -/// @param coordinator::ShardMap shard_map getting label and property names. +/// @param msgs::ShardRequestManagerInterface *shard_request_manager getting label and property names. /// @param storage::v3::View for deciding which vertex attributes are visible. /// /// @throw std::bad_alloc -storage::v3::Result ToBoltVertex(const storage::v3::VertexAccessor &vertex, - const coordinator::ShardMap &shard_map, - storage::v3::View view); +storage::v3::Result ToBoltVertex( + const storage::v3::VertexAccessor &vertex, const msgs::ShardRequestManagerInterface *shard_request_manager, + storage::v3::View view); /// @param storage::v3::EdgeAccessor for converting to communication::bolt::Edge. -/// @param coordinator::ShardMap shard_map getting edge type and property names. +/// @param msgs::ShardRequestManagerInterface *shard_request_manager getting edge type and property names. /// @param storage::v3::View for deciding which edge attributes are visible. /// /// @throw std::bad_alloc -storage::v3::Result ToBoltEdge(const storage::v3::EdgeAccessor &edge, - const coordinator::ShardMap &shard_map, - storage::v3::View view); +storage::v3::Result ToBoltEdge( + const storage::v3::EdgeAccessor &edge, const msgs::ShardRequestManagerInterface *shard_request_manager, + storage::v3::View view); /// @param query::v2::Path for converting to communication::bolt::Path. -/// @param coordinator::ShardMap shard_map ToBoltVertex and ToBoltEdge. +/// @param msgs::ShardRequestManagerInterface *shard_request_manager ToBoltVertex and ToBoltEdge. /// @param storage::v3::View for ToBoltVertex and ToBoltEdge. /// /// @throw std::bad_alloc -storage::v3::Result ToBoltPath(const query::v2::accessors::Path &path, - const coordinator::ShardMap &shard_map, - storage::v3::View view); +storage::v3::Result ToBoltPath( + const query::v2::accessors::Path &path, const msgs::ShardRequestManagerInterface *shard_request_manager, + storage::v3::View view); /// @param query::v2::TypedValue for converting to communication::bolt::Value. -/// @param coordinator::ShardMap shard_map ToBoltVertex and ToBoltEdge. +/// @param msgs::ShardRequestManagerInterface *shard_request_manager ToBoltVertex and ToBoltEdge. /// @param storage::v3::View for ToBoltVertex and ToBoltEdge. /// /// @throw std::bad_alloc -storage::v3::Result ToBoltValue(const query::v2::TypedValue &value, - const coordinator::ShardMap &shard_map, - storage::v3::View view); +storage::v3::Result ToBoltValue( + const query::v2::TypedValue &value, const msgs::ShardRequestManagerInterface *shard_request_manager, + storage::v3::View view); query::v2::TypedValue ToTypedValue(const communication::bolt::Value &value); @@ -73,7 +74,8 @@ storage::v3::PropertyValue ToPropertyValue(const communication::bolt::Value &val communication::bolt::Value ToBoltValue(msgs::Value value); -communication::bolt::Value ToBoltValue(msgs::Value value, const coordinator::ShardMap &shard_map, +communication::bolt::Value ToBoltValue(msgs::Value value, + const msgs::ShardRequestManagerInterface *shard_request_manager, storage::v3::View view); } // namespace memgraph::glue::v2 diff --git a/src/memgraph.cpp b/src/memgraph.cpp index c696615b5..301b3ed36 100644 --- a/src/memgraph.cpp +++ b/src/memgraph.cpp @@ -407,9 +407,8 @@ DEFINE_string(organization_name, "", "Organization name."); struct SessionData { // Explicit constructor here to ensure that pointers to all objects are // supplied. - SessionData(memgraph::coordinator::ShardMap &shard_map, memgraph::query::v2::InterpreterContext *interpreter_context) - : shard_map(&shard_map), interpreter_context(interpreter_context) {} - memgraph::coordinator::ShardMap *shard_map; + explicit SessionData(memgraph::query::v2::InterpreterContext *interpreter_context) + : interpreter_context(interpreter_context) {} memgraph::query::v2::InterpreterContext *interpreter_context; }; @@ -424,7 +423,6 @@ class BoltSession final : public memgraph::communication::bolt::Session(input_stream, output_stream), - shard_map_(data.shard_map), interpreter_(data.interpreter_context), endpoint_(endpoint) {} @@ -455,7 +453,7 @@ class BoltSession final : public memgraph::communication::bolt::Session Pull(TEncoder *encoder, std::optional n, std::optional qid) override { - TypedValueResultStream stream(encoder, *shard_map_); + TypedValueResultStream stream(encoder, interpreter_.GetShardRequestManager()); return PullResults(stream, n, qid); } @@ -482,7 +480,8 @@ class BoltSession final : public memgraph::communication::bolt::Session decoded_summary; for (const auto &kv : summary) { - auto maybe_value = memgraph::glue::v2::ToBoltValue(kv.second, *shard_map_, memgraph::storage::v3::View::NEW); + auto maybe_value = memgraph::glue::v2::ToBoltValue(kv.second, interpreter_.GetShardRequestManager(), + memgraph::storage::v3::View::NEW); if (maybe_value.HasError()) { switch (maybe_value.GetError()) { case memgraph::storage::v3::Error::DELETED_OBJECT: @@ -507,14 +506,14 @@ class BoltSession final : public memgraph::communication::bolt::Session &values) { std::vector decoded_values; decoded_values.reserve(values.size()); for (const auto &v : values) { - auto maybe_value = memgraph::glue::v2::ToBoltValue(v, *shard_map_, memgraph::storage::v3::View::NEW); + auto maybe_value = memgraph::glue::v2::ToBoltValue(v, shard_request_manager_, memgraph::storage::v3::View::NEW); if (maybe_value.HasError()) { switch (maybe_value.GetError()) { case memgraph::storage::v3::Error::DELETED_OBJECT: @@ -534,12 +533,8 @@ class BoltSession final : public memgraph::communication::bolt::Session> &EdgeAccessor::Properties() const { - return edge.properties; - // std::map res; - // for (const auto &[name, value] : *properties) { - // res[name] = ValueToTypedValue(value); - // } - // return res; -} +const std::vector> &EdgeAccessor::Properties() const { return edge.properties; } -// NOLINTNEXTLINE(readability-convert-member-functions-to-static) -Value EdgeAccessor::GetProperty(const std::string & /*prop_name*/) const { - // TODO(kostasrim) fix this - return {}; +Value EdgeAccessor::GetProperty(const std::string &prop_name) const { + auto prop_id = manager_->NameToProperty(prop_name); + auto it = std::find_if(edge.properties.begin(), edge.properties.end(), [&](auto &pr) { return prop_id == pr.first; }); + if (it == edge.properties.end()) { + return {}; + } + return it->second; } const Edge &EdgeAccessor::GetEdge() const { return edge; } bool EdgeAccessor::IsCycle() const { return edge.src == edge.dst; }; -VertexAccessor EdgeAccessor::To() const { return VertexAccessor(Vertex{edge.dst}, {}); } +VertexAccessor EdgeAccessor::To() const { + return VertexAccessor(Vertex{edge.dst}, std::vector>{}, manager_); +} -VertexAccessor EdgeAccessor::From() const { return VertexAccessor(Vertex{edge.src}, {}); } +VertexAccessor EdgeAccessor::From() const { + return VertexAccessor(Vertex{edge.src}, std::vector>{}, manager_); +} -VertexAccessor::VertexAccessor(Vertex v, std::vector> props) - : vertex(std::move(v)), properties(std::move(props)) {} +VertexAccessor::VertexAccessor(Vertex v, std::vector> props, + const msgs::ShardRequestManagerInterface *manager) + : vertex(std::move(v)), properties(std::move(props)), manager_(manager) {} + +VertexAccessor::VertexAccessor(Vertex v, std::map &&props, + const msgs::ShardRequestManagerInterface *manager) + : vertex(std::move(v)), manager_(manager) { + properties.reserve(props.size()); + for (auto &[id, value] : props) { + properties.emplace_back(std::make_pair(id, std::move(value))); + } +} + +VertexAccessor::VertexAccessor(Vertex v, const std::map &props, + const msgs::ShardRequestManagerInterface *manager) + : vertex(std::move(v)), manager_(manager) { + properties.reserve(props.size()); + for (const auto &[id, value] : props) { + properties.emplace_back(std::make_pair(id, value)); + } +} Label VertexAccessor::PrimaryLabel() const { return vertex.id.first; } @@ -58,15 +79,16 @@ bool VertexAccessor::HasLabel(Label &label) const { const std::vector> &VertexAccessor::Properties() const { return properties; } Value VertexAccessor::GetProperty(PropertyId prop_id) const { - return std::find_if(properties.begin(), properties.end(), [&](auto &pr) { return prop_id == pr.first; })->second; - // return ValueToTypedValue(properties[prop_name]); + auto it = std::find_if(properties.begin(), properties.end(), [&](auto &pr) { return prop_id == pr.first; }); + if (it == properties.end()) { + return {}; + } + return it->second; } // NOLINTNEXTLINE(readability-convert-member-functions-to-static) -Value VertexAccessor::GetProperty(const std::string & /*prop_name*/) const { - // TODO(kostasrim) Add string mapping - return {}; - // return ValueToTypedValue(properties[prop_name]); +Value VertexAccessor::GetProperty(const std::string &prop_name) const { + return GetProperty(manager_->NameToProperty(prop_name)); } msgs::Vertex VertexAccessor::GetVertex() const { return vertex; } diff --git a/src/query/v2/accessors.hpp b/src/query/v2/accessors.hpp index eed169b8a..8e10c865d 100644 --- a/src/query/v2/accessors.hpp +++ b/src/query/v2/accessors.hpp @@ -11,6 +11,7 @@ #pragma once +#include #include #include #include @@ -23,6 +24,10 @@ #include "utils/memory.hpp" #include "utils/memory_tracker.hpp" +namespace memgraph::msgs { +class ShardRequestManagerInterface; +} // namespace memgraph::msgs + namespace memgraph::query::v2::accessors { using Value = memgraph::msgs::Value; @@ -36,7 +41,7 @@ class VertexAccessor; class EdgeAccessor final { public: - explicit EdgeAccessor(Edge edge); + explicit EdgeAccessor(Edge edge, const msgs::ShardRequestManagerInterface *manager); [[nodiscard]] EdgeTypeId EdgeType() const; @@ -64,6 +69,7 @@ class EdgeAccessor final { private: Edge edge; + const msgs::ShardRequestManagerInterface *manager_; }; class VertexAccessor final { @@ -71,7 +77,11 @@ class VertexAccessor final { using PropertyId = msgs::PropertyId; using Label = msgs::Label; using VertexId = msgs::VertexId; - VertexAccessor(Vertex v, std::vector> props); + VertexAccessor(Vertex v, std::vector> props, + const msgs::ShardRequestManagerInterface *manager); + + VertexAccessor(Vertex v, std::map &&props, const msgs::ShardRequestManagerInterface *manager); + VertexAccessor(Vertex v, const std::map &props, const msgs::ShardRequestManagerInterface *manager); [[nodiscard]] Label PrimaryLabel() const; @@ -140,6 +150,7 @@ class VertexAccessor final { private: Vertex vertex; std::vector> properties; + const msgs::ShardRequestManagerInterface *manager_; }; // inline VertexAccessor EdgeAccessor::To() const { return VertexAccessor(impl_.ToVertex()); } diff --git a/src/query/v2/bindings/eval.hpp b/src/query/v2/bindings/eval.hpp index 8c4ad6d34..52455bdb2 100644 --- a/src/query/v2/bindings/eval.hpp +++ b/src/query/v2/bindings/eval.hpp @@ -23,6 +23,10 @@ #include "storage/v3/property_value.hpp" #include "storage/v3/view.hpp" +namespace memgraph::msgs { +class ShardRequestManagerInterface; +} // namespace memgraph::msgs + namespace memgraph::query::v2 { inline const auto lam = [](const auto &val) { return ValueToTypedValue(val); }; @@ -32,13 +36,14 @@ class Callable { auto operator()(const memgraph::storage::v3::PropertyValue &val) const { return memgraph::storage::v3::PropertyToTypedValue(val); }; - auto operator()(const msgs::Value &val) const { return ValueToTypedValue(val); }; + auto operator()(const msgs::Value &val, memgraph::msgs::ShardRequestManagerInterface *manager) const { + return ValueToTypedValue(val, manager); + }; }; } // namespace detail -using ExpressionEvaluator = - memgraph::expr::ExpressionEvaluator; +using ExpressionEvaluator = memgraph::expr::ExpressionEvaluator< + TypedValue, memgraph::query::v2::EvaluationContext, memgraph::msgs::ShardRequestManagerInterface, storage::v3::View, + storage::v3::LabelId, msgs::Value, detail::Callable, memgraph::storage::v3::Error, memgraph::expr::QueryEngineTag>; } // namespace memgraph::query::v2 diff --git a/src/query/v2/conversions.hpp b/src/query/v2/conversions.hpp index 0c67c794b..10299c919 100644 --- a/src/query/v2/conversions.hpp +++ b/src/query/v2/conversions.hpp @@ -13,10 +13,11 @@ #include "bindings/typed_value.hpp" #include "query/v2/accessors.hpp" #include "query/v2/requests.hpp" +#include "query/v2/shard_request_manager.hpp" namespace memgraph::query::v2 { -inline TypedValue ValueToTypedValue(const msgs::Value &value) { +inline TypedValue ValueToTypedValue(const msgs::Value &value, msgs::ShardRequestManagerInterface *manager) { using Value = msgs::Value; switch (value.type) { case Value::Type::Null: @@ -34,7 +35,7 @@ inline TypedValue ValueToTypedValue(const msgs::Value &value) { std::vector dst; dst.reserve(lst.size()); for (const auto &elem : lst) { - dst.push_back(ValueToTypedValue(elem)); + dst.push_back(ValueToTypedValue(elem, manager)); } return TypedValue(std::move(dst)); } @@ -42,14 +43,15 @@ inline TypedValue ValueToTypedValue(const msgs::Value &value) { const auto &value_map = value.map_v; std::map dst; for (const auto &[key, val] : value_map) { - dst[key] = ValueToTypedValue(val); + dst[key] = ValueToTypedValue(val, manager); } return TypedValue(std::move(dst)); } case Value::Type::Vertex: - return TypedValue(accessors::VertexAccessor(value.vertex_v, {})); + return TypedValue(accessors::VertexAccessor( + value.vertex_v, std::vector>{}, manager)); case Value::Type::Edge: - return TypedValue(accessors::EdgeAccessor(value.edge_v)); + return TypedValue(accessors::EdgeAccessor(value.edge_v, manager)); } throw std::runtime_error("Incorrect type in conversion"); } diff --git a/src/query/v2/interpret/awesome_memgraph_functions.cpp b/src/query/v2/interpret/awesome_memgraph_functions.cpp index 46b0fb9fe..8768736b2 100644 --- a/src/query/v2/interpret/awesome_memgraph_functions.cpp +++ b/src/query/v2/interpret/awesome_memgraph_functions.cpp @@ -22,8 +22,8 @@ #include "query/v2/bindings/typed_value.hpp" #include "query/v2/conversions.hpp" -#include "query/v2/db_accessor.hpp" #include "query/v2/exceptions.hpp" +#include "query/v2/shard_request_manager.hpp" #include "storage/v3/conversions.hpp" #include "utils/string.hpp" #include "utils/temporal.hpp" diff --git a/src/query/v2/interpret/awesome_memgraph_functions.hpp b/src/query/v2/interpret/awesome_memgraph_functions.hpp index d15ef5a9b..134f05d7d 100644 --- a/src/query/v2/interpret/awesome_memgraph_functions.hpp +++ b/src/query/v2/interpret/awesome_memgraph_functions.hpp @@ -16,12 +16,15 @@ #include #include "query/v2/bindings/typed_value.hpp" +#include "query/v2/db_accessor.hpp" #include "storage/v3/view.hpp" #include "utils/memory.hpp" -namespace memgraph::query::v2 { +namespace memgraph::msgs { +class ShardRequestManagerInterface; +} // namespace memgraph::msgs -class DbAccessor; +namespace memgraph::query::v2 { namespace { const char kStartsWith[] = "STARTSWITH"; @@ -31,7 +34,9 @@ const char kId[] = "ID"; } // namespace struct FunctionContext { - DbAccessor *db_accessor; + // TODO(kostasrim) consider optional here. ShardRequestManager does not exist on the storage. + // DbAccessor *db_accessor; + msgs::ShardRequestManagerInterface *manager; utils::MemoryResource *memory; int64_t timestamp; std::unordered_map *counters; diff --git a/src/query/v2/interpreter.cpp b/src/query/v2/interpreter.cpp index c02497030..a1637b4c3 100644 --- a/src/query/v2/interpreter.cpp +++ b/src/query/v2/interpreter.cpp @@ -144,7 +144,7 @@ class ReplQueryHandler final : public query::v2::ReplicationQueryHandler { /// @throw QueryRuntimeException if an error ocurred. Callback HandleAuthQuery(AuthQuery *auth_query, AuthQueryHandler *auth, const Parameters ¶meters, - DbAccessor *db_accessor) { + msgs::ShardRequestManagerInterface *manager) { // Empty frame for evaluation of password expression. This is OK since // password should be either null or string literal and it's evaluation // should not depend on frame. @@ -155,7 +155,7 @@ Callback HandleAuthQuery(AuthQuery *auth_query, AuthQueryHandler *auth, const Pa // the argument to Callback. evaluation_context.timestamp = QueryTimestamp(); evaluation_context.parameters = parameters; - ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, db_accessor, storage::v3::View::OLD); + ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, manager, storage::v3::View::OLD); std::string username = auth_query->user_; std::string rolename = auth_query->role_; @@ -313,7 +313,7 @@ Callback HandleAuthQuery(AuthQuery *auth_query, AuthQueryHandler *auth, const Pa } Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters ¶meters, - InterpreterContext *interpreter_context, DbAccessor *db_accessor, + InterpreterContext *interpreter_context, msgs::ShardRequestManagerInterface *manager, std::vector *notifications) { expr::Frame frame(0); SymbolTable symbol_table; @@ -322,7 +322,7 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters & // the argument to Callback. evaluation_context.timestamp = QueryTimestamp(); evaluation_context.parameters = parameters; - ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, db_accessor, storage::v3::View::OLD); + ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, manager, storage::v3::View::OLD); Callback callback; switch (repl_query->action_) { @@ -448,7 +448,8 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters & } } -Callback HandleSettingQuery(SettingQuery *setting_query, const Parameters ¶meters, DbAccessor *db_accessor) { +Callback HandleSettingQuery(SettingQuery *setting_query, const Parameters ¶meters, + msgs::ShardRequestManagerInterface *manager) { expr::Frame frame(0); SymbolTable symbol_table; EvaluationContext evaluation_context; @@ -458,7 +459,7 @@ Callback HandleSettingQuery(SettingQuery *setting_query, const Parameters ¶m std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()) .count(); evaluation_context.parameters = parameters; - ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, db_accessor, storage::v3::View::OLD); + ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, manager, storage::v3::View::OLD); Callback callback; switch (setting_query->action_) { @@ -886,7 +887,8 @@ PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::mapmemory_limit_, cypher_query->memory_scale_); if (memory_limit) { @@ -901,7 +903,6 @@ PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::mapStartTransaction(); auto plan = CypherQueryToPlan( parsed_query.stripped_query.hash(), std::move(parsed_query.ast_storage), cypher_query, parsed_query.parameters, parsed_query.is_cacheable ? &interpreter_context->plan_cache : nullptr, shard_request_manager); @@ -957,10 +958,10 @@ PreparedQuery PrepareExplainQuery(ParsedQuery parsed_query, std::map(parsed_inner_query.query); MG_ASSERT(cypher_query, "Cypher grammar should not allow other queries in EXPLAIN"); - auto cypher_query_plan = - CypherQueryToPlan(parsed_inner_query.stripped_query.hash(), std::move(parsed_inner_query.ast_storage), - cypher_query, parsed_inner_query.parameters, - parsed_inner_query.is_cacheable ? &interpreter_context->plan_cache : nullptr, nullptr); + auto cypher_query_plan = CypherQueryToPlan( + parsed_inner_query.stripped_query.hash(), std::move(parsed_inner_query.ast_storage), cypher_query, + parsed_inner_query.parameters, parsed_inner_query.is_cacheable ? &interpreter_context->plan_cache : nullptr, + shard_request_manager); std::stringstream printed_plan; plan::PrettyPrint(*shard_request_manager, &cypher_query_plan->plan(), &printed_plan); @@ -1030,7 +1031,8 @@ PreparedQuery PrepareProfileQuery(ParsedQuery parsed_query, bool in_explicit_tra EvaluationContext evaluation_context; evaluation_context.timestamp = QueryTimestamp(); evaluation_context.parameters = parsed_inner_query.parameters; - ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, dba, storage::v3::View::OLD); + ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, shard_request_manager, + storage::v3::View::OLD); const auto memory_limit = expr::EvaluateMemoryLimit(&evaluator, cypher_query->memory_limit_, cypher_query->memory_scale_); @@ -1179,14 +1181,15 @@ PreparedQuery PrepareIndexQuery(ParsedQuery parsed_query, bool in_explicit_trans PreparedQuery PrepareAuthQuery(ParsedQuery parsed_query, bool in_explicit_transaction, std::map *summary, InterpreterContext *interpreter_context, - DbAccessor *dba, utils::MemoryResource *execution_memory) { + DbAccessor *dba, utils::MemoryResource *execution_memory, + msgs::ShardRequestManagerInterface *manager) { if (in_explicit_transaction) { throw UserModificationInMulticommandTxException(); } auto *auth_query = utils::Downcast(parsed_query.query); - auto callback = HandleAuthQuery(auth_query, interpreter_context->auth, parsed_query.parameters, dba); + auto callback = HandleAuthQuery(auth_query, interpreter_context->auth, parsed_query.parameters, manager); SymbolTable symbol_table; std::vector output_symbols; @@ -1215,14 +1218,14 @@ PreparedQuery PrepareAuthQuery(ParsedQuery parsed_query, bool in_explicit_transa PreparedQuery PrepareReplicationQuery(ParsedQuery parsed_query, const bool in_explicit_transaction, std::vector *notifications, InterpreterContext *interpreter_context, - DbAccessor *dba) { + msgs::ShardRequestManagerInterface *manager) { if (in_explicit_transaction) { throw ReplicationModificationInMulticommandTxException(); } auto *replication_query = utils::Downcast(parsed_query.query); auto callback = - HandleReplicationQuery(replication_query, parsed_query.parameters, interpreter_context, dba, notifications); + HandleReplicationQuery(replication_query, parsed_query.parameters, interpreter_context, manager, notifications); return PreparedQuery{callback.header, std::move(parsed_query.required_privileges), [callback_fn = std::move(callback.fn), pull_plan = std::shared_ptr{nullptr}]( @@ -1310,14 +1313,15 @@ PreparedQuery PrepareCreateSnapshotQuery(ParsedQuery parsed_query, bool in_expli throw SemanticException("CreateSnapshot query is not supported!"); } -PreparedQuery PrepareSettingQuery(ParsedQuery parsed_query, const bool in_explicit_transaction, DbAccessor *dba) { +PreparedQuery PrepareSettingQuery(ParsedQuery parsed_query, const bool in_explicit_transaction, + msgs::ShardRequestManagerInterface *manager) { if (in_explicit_transaction) { throw SettingConfigInMulticommandTxException{}; } auto *setting_query = utils::Downcast(parsed_query.query); MG_ASSERT(setting_query); - auto callback = HandleSettingQuery(setting_query, parsed_query.parameters, dba); + auto callback = HandleSettingQuery(setting_query, parsed_query.parameters, manager); return PreparedQuery{std::move(callback.header), std::move(parsed_query.required_privileges), [callback_fn = std::move(callback.fn), pull_plan = std::shared_ptr{nullptr}]( @@ -1511,6 +1515,11 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string, ParsedQuery parsed_query = ParseQuery(query_string, params, &interpreter_context_->ast_cache, interpreter_context_->config.query); query_execution->summary["parsing_time"] = parsing_timer.Elapsed().count(); + if (!in_explicit_transaction_ && + (utils::Downcast(parsed_query.query) || utils::Downcast(parsed_query.query) || + utils::Downcast(parsed_query.query))) { + shard_request_manager_->StartTransaction(); + } utils::Timer planning_timer; PreparedQuery prepared_query; @@ -1533,9 +1542,9 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string, prepared_query = PrepareIndexQuery(std::move(parsed_query), in_explicit_transaction_, &query_execution->notifications, interpreter_context_); } else if (utils::Downcast(parsed_query.query)) { - prepared_query = PrepareAuthQuery(std::move(parsed_query), in_explicit_transaction_, &query_execution->summary, - interpreter_context_, &*execution_db_accessor_, - &query_execution->execution_memory_with_exception); + prepared_query = PrepareAuthQuery( + std::move(parsed_query), in_explicit_transaction_, &query_execution->summary, interpreter_context_, + &*execution_db_accessor_, &query_execution->execution_memory_with_exception, shard_request_manager_.get()); } else if (utils::Downcast(parsed_query.query)) { prepared_query = PrepareInfoQuery(std::move(parsed_query), in_explicit_transaction_, &query_execution->summary, interpreter_context_, interpreter_context_->db, @@ -1546,7 +1555,7 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string, } else if (utils::Downcast(parsed_query.query)) { prepared_query = PrepareReplicationQuery(std::move(parsed_query), in_explicit_transaction_, &query_execution->notifications, - interpreter_context_, &*execution_db_accessor_); + interpreter_context_, shard_request_manager_.get()); } else if (utils::Downcast(parsed_query.query)) { prepared_query = PrepareLockPathQuery(std::move(parsed_query), in_explicit_transaction_, interpreter_context_, &*execution_db_accessor_); @@ -1563,7 +1572,8 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string, prepared_query = PrepareCreateSnapshotQuery(std::move(parsed_query), in_explicit_transaction_, interpreter_context_); } else if (utils::Downcast(parsed_query.query)) { - prepared_query = PrepareSettingQuery(std::move(parsed_query), in_explicit_transaction_, &*execution_db_accessor_); + prepared_query = + PrepareSettingQuery(std::move(parsed_query), in_explicit_transaction_, shard_request_manager_.get()); } else if (utils::Downcast(parsed_query.query)) { prepared_query = PrepareVersionQuery(std::move(parsed_query), in_explicit_transaction_); } else if (utils::Downcast(parsed_query.query)) { diff --git a/src/query/v2/interpreter.hpp b/src/query/v2/interpreter.hpp index 74484ded9..dc413ef44 100644 --- a/src/query/v2/interpreter.hpp +++ b/src/query/v2/interpreter.hpp @@ -296,6 +296,8 @@ class Interpreter final { */ void Abort(); + const msgs::ShardRequestManagerInterface *GetShardRequestManager() const { return shard_request_manager_.get(); } + private: struct QueryExecution { std::optional prepared_query; diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp index 4243c6bd1..0832c60cf 100644 --- a/src/query/v2/plan/operator.cpp +++ b/src/query/v2/plan/operator.cpp @@ -169,6 +169,7 @@ class DistributedCreateNodeCursor : public Cursor { if (input_cursor_->Pull(frame, context)) { auto &shard_manager = context.shard_request_manager; shard_manager->Request(state_, NodeCreationInfoToRequest(context, frame)); + PlaceNodeOnTheFrame(frame, context); return true; } @@ -179,8 +180,19 @@ class DistributedCreateNodeCursor : public Cursor { void Reset() override { state_ = {}; } - std::vector NodeCreationInfoToRequest(ExecutionContext &context, Frame &frame) const { + void PlaceNodeOnTheFrame(Frame &frame, ExecutionContext &context) { + // TODO(kostasrim) Make this work with batching + const auto primary_label = msgs::Label{.id = nodes_info_[0]->labels[0]}; + msgs::Vertex v{.id = std::make_pair(primary_label, primary_keys_[0])}; + frame[nodes_info_.front()->symbol] = TypedValue( + query::v2::accessors::VertexAccessor(std::move(v), src_vertex_props_[0], context.shard_request_manager)); + } + + std::vector NodeCreationInfoToRequest(ExecutionContext &context, Frame &frame) { std::vector requests; + // TODO(kostasrim) this assertion should be removed once we support multiple vertex creation + MG_ASSERT(nodes_info_.size() == 1); + msgs::PrimaryKey pk; for (const auto &node_info : nodes_info_) { msgs::NewVertex rqst; MG_ASSERT(!node_info->labels.empty(), "Cannot determine primary label"); @@ -188,17 +200,14 @@ class DistributedCreateNodeCursor : public Cursor { // TODO(jbajic) Fix properties not send, // suggestion: ignore distinction between properties and primary keys // since schema validation is done on storage side - std::map properties; ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, nullptr, storage::v3::View::NEW); if (const auto *node_info_properties = std::get_if(&node_info->properties)) { for (const auto &[key, value_expression] : *node_info_properties) { TypedValue val = value_expression->Accept(evaluator); - if (context.shard_request_manager->IsPrimaryKey(primary_label, key)) { rqst.primary_key.push_back(TypedValueToValue(val)); - } else { - properties[key] = TypedValueToValue(val); + pk.push_back(TypedValueToValue(val)); } } } else { @@ -207,9 +216,8 @@ class DistributedCreateNodeCursor : public Cursor { auto key_str = std::string(key); auto property_id = context.shard_request_manager->NameToProperty(key_str); if (context.shard_request_manager->IsPrimaryKey(primary_label, property_id)) { - rqst.primary_key.push_back(storage::v3::TypedValueToValue(value)); - } else { - properties[property_id] = TypedValueToValue(value); + rqst.primary_key.push_back(TypedValueToValue(value)); + pk.push_back(TypedValueToValue(value)); } } } @@ -219,14 +227,18 @@ class DistributedCreateNodeCursor : public Cursor { } // TODO(kostasrim) Copy non primary labels as well rqst.label_ids.push_back(msgs::Label{.id = primary_label}); + src_vertex_props_.push_back(rqst.properties); requests.push_back(std::move(rqst)); } + primary_keys_.push_back(std::move(pk)); return requests; } private: const UniqueCursorPtr input_cursor_; std::vector nodes_info_; + std::vector>> src_vertex_props_; + std::vector primary_keys_; msgs::ExecutionState state_; }; @@ -687,7 +699,7 @@ bool Filter::FilterCursor::Pull(Frame &frame, ExecutionContext &context) { // Like all filters, newly set values should not affect filtering of old // nodes and edges. - ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor, + ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.shard_request_manager, storage::v3::View::OLD); while (input_cursor_->Pull(frame, context)) { if (EvaluateFilter(evaluator, self_.expression_)) return true; @@ -728,8 +740,8 @@ bool Produce::ProduceCursor::Pull(Frame &frame, ExecutionContext &context) { if (input_cursor_->Pull(frame, context)) { // Produce should always yield the latest results. - ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor, - storage::v3::View::NEW); + ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, + context.shard_request_manager, storage::v3::View::NEW); for (auto named_expr : self_.named_expressions_) named_expr->Accept(evaluator); return true; @@ -1149,8 +1161,8 @@ class AggregateCursor : public Cursor { * aggregation results, and not on the number of inputs. */ void ProcessAll(Frame *frame, ExecutionContext *context) { - ExpressionEvaluator evaluator(frame, context->symbol_table, context->evaluation_context, context->db_accessor, - storage::v3::View::NEW); + ExpressionEvaluator evaluator(frame, context->symbol_table, context->evaluation_context, + context->shard_request_manager, storage::v3::View::NEW); while (input_cursor_->Pull(*frame, *context)) { ProcessOne(*frame, &evaluator); } @@ -1370,8 +1382,8 @@ bool Skip::SkipCursor::Pull(Frame &frame, ExecutionContext &context) { // First successful pull from the input, evaluate the skip expression. // The skip expression doesn't contain identifiers so graph view // parameter is not important. - ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor, - storage::v3::View::OLD); + ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, + context.shard_request_manager, storage::v3::View::OLD); TypedValue to_skip = self_.expression_->Accept(evaluator); if (to_skip.type() != TypedValue::Type::Int) throw QueryRuntimeException("Number of elements to skip must be an integer."); @@ -1425,8 +1437,8 @@ bool Limit::LimitCursor::Pull(Frame &frame, ExecutionContext &context) { if (limit_ == -1) { // Limit expression doesn't contain identifiers so graph view is not // important. - ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor, - storage::v3::View::OLD); + ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, + context.shard_request_manager, storage::v3::View::OLD); TypedValue limit = self_.expression_->Accept(evaluator); if (limit.type() != TypedValue::Type::Int) throw QueryRuntimeException("Limit on number of returned elements must be an integer."); @@ -1481,8 +1493,8 @@ class OrderByCursor : public Cursor { SCOPED_PROFILE_OP("OrderBy"); if (!did_pull_all_) { - ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor, - storage::v3::View::OLD); + ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, + context.shard_request_manager, storage::v3::View::OLD); auto *mem = cache_.get_allocator().GetMemoryResource(); while (input_cursor_->Pull(frame, context)) { // collect the order_by elements @@ -1739,8 +1751,8 @@ class UnwindCursor : public Cursor { if (!input_cursor_->Pull(frame, context)) return false; // successful pull from input, initialize value and iterator - ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor, - storage::v3::View::OLD); + ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, + context.shard_request_manager, storage::v3::View::OLD); TypedValue input_value = self_.input_expression_->Accept(evaluator); if (input_value.type() != TypedValue::Type::List) throw QueryRuntimeException("Argument of UNWIND must be a list, but '{}' was provided.", input_value.type()); @@ -2217,7 +2229,7 @@ class LoadCsvCursor : public Cursor { // self_->delimiter_, and self_->quote_ earlier (say, in the interpreter.cpp) // without massacring the code even worse than I did here if (UNLIKELY(!reader_)) { - reader_ = MakeReader(&context.evaluation_context); + reader_ = MakeReader(context); } bool input_pulled = input_cursor_->Pull(frame, context); @@ -2246,11 +2258,12 @@ class LoadCsvCursor : public Cursor { void Shutdown() override { input_cursor_->Shutdown(); } private: - csv::Reader MakeReader(EvaluationContext *eval_context) { + csv::Reader MakeReader(ExecutionContext &context) { + auto &eval_context = context.evaluation_context; Frame frame(0); SymbolTable symbol_table; - DbAccessor *dba = nullptr; - auto evaluator = ExpressionEvaluator(&frame, symbol_table, *eval_context, dba, storage::v3::View::OLD); + auto evaluator = + ExpressionEvaluator(&frame, symbol_table, eval_context, context.shard_request_manager, storage::v3::View::OLD); auto maybe_file = ToOptionalString(&evaluator, self_->file_); auto maybe_delim = ToOptionalString(&evaluator, self_->delimiter_); @@ -2287,8 +2300,8 @@ class ForeachCursor : public Cursor { return false; } - ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.db_accessor, - storage::v3::View::NEW); + ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, + context.shard_request_manager, storage::v3::View::NEW); TypedValue expr_result = expression->Accept(evaluator); if (expr_result.IsNull()) { @@ -2458,15 +2471,51 @@ class DistributedExpandCursor : public Cursor { : self_(self), input_cursor_(self.input_->MakeCursor(mem)), current_in_edge_it_(current_in_edges_.begin()), - current_out_edge_it_(current_out_edges_.begin()) { - if (self_.common_.existing_node) { - throw QueryRuntimeException("Cannot use existing node with DistributedExpandOne cursor!"); - } - } + current_out_edge_it_(current_out_edges_.begin()) {} using VertexAccessor = accessors::VertexAccessor; using EdgeAccessor = accessors::EdgeAccessor; + static constexpr auto DirectionToMsgsDirection(const auto direction) { + switch (direction) { + case EdgeAtom::Direction::IN: + return msgs::EdgeDirection::IN; + case EdgeAtom::Direction::OUT: + return msgs::EdgeDirection::OUT; + case EdgeAtom::Direction::BOTH: + return msgs::EdgeDirection::BOTH; + } + }; + + void PullDstVertex(Frame &frame, ExecutionContext &context, const EdgeAtom::Direction direction) { + if (self_.common_.existing_node) { + return; + } + MG_ASSERT(direction != EdgeAtom::Direction::BOTH); + const auto &edge = frame[self_.common_.edge_symbol].ValueEdge(); + static auto get_dst_vertex = [&edge](const EdgeAtom::Direction direction) { + switch (direction) { + case EdgeAtom::Direction::IN: + return edge.From().Id(); + case EdgeAtom::Direction::OUT: + return edge.To().Id(); + case EdgeAtom::Direction::BOTH: + throw std::runtime_error("EdgeDirection Both not implemented"); + } + }; + msgs::ExpandOneRequest request; + // to not fetch any properties of the edges + request.edge_properties.emplace(); + request.src_vertices.push_back(get_dst_vertex(direction)); + request.direction = (direction == EdgeAtom::Direction::IN) ? msgs::EdgeDirection::OUT : msgs::EdgeDirection::IN; + msgs::ExecutionState request_state; + auto result_rows = context.shard_request_manager->Request(request_state, std::move(request)); + MG_ASSERT(result_rows.size() == 1); + auto &result_row = result_rows.front(); + frame[self_.common_.node_symbol] = accessors::VertexAccessor( + msgs::Vertex{result_row.src_vertex}, result_row.src_vertex_properties, context.shard_request_manager); + } + bool InitEdges(Frame &frame, ExecutionContext &context) { // Input Vertex could be null if it is created by a failed optional match. In // those cases we skip that input pull and continue with the next. @@ -2480,44 +2529,41 @@ class DistributedExpandCursor : public Cursor { ExpectType(self_.input_symbol_, vertex_value, TypedValue::Type::Vertex); auto &vertex = vertex_value.ValueVertex(); - static constexpr auto direction_to_msgs_direction = [](const EdgeAtom::Direction direction) { - switch (direction) { - case EdgeAtom::Direction::IN: - return msgs::EdgeDirection::IN; - case EdgeAtom::Direction::OUT: - return msgs::EdgeDirection::OUT; - case EdgeAtom::Direction::BOTH: - return msgs::EdgeDirection::BOTH; - } - }; - msgs::ExpandOneRequest request; - request.direction = direction_to_msgs_direction(self_.common_.direction); + request.direction = DirectionToMsgsDirection(self_.common_.direction); // to not fetch any properties of the edges request.edge_properties.emplace(); request.src_vertices.push_back(vertex.Id()); msgs::ExecutionState request_state; auto result_rows = context.shard_request_manager->Request(request_state, std::move(request)); - MG_ASSERT(result_rows.size() == 1); auto &result_row = result_rows.front(); - const auto convert_edges = [&vertex]( + if (self_.common_.existing_node) { + const auto &node = frame[self_.common_.node_symbol].ValueVertex().Id(); + auto &in = result_row.in_edges_with_specific_properties; + std::erase_if(in, [&node](auto &edge) { return edge.other_end != node; }); + auto &out = result_row.out_edges_with_specific_properties; + std::erase_if(out, [&node](auto &edge) { return edge.other_end != node; }); + } + + const auto convert_edges = [&vertex, &context]( std::vector &&edge_messages, const EdgeAtom::Direction direction) { std::vector edge_accessors; edge_accessors.reserve(edge_messages.size()); + switch (direction) { case EdgeAtom::Direction::IN: { for (auto &edge : edge_messages) { - edge_accessors.emplace_back( - msgs::Edge{std::move(edge.other_end), vertex.Id(), {}, {edge.gid}, edge.type}); + edge_accessors.emplace_back(msgs::Edge{std::move(edge.other_end), vertex.Id(), {}, {edge.gid}, edge.type}, + context.shard_request_manager); } break; } case EdgeAtom::Direction::OUT: { for (auto &edge : edge_messages) { - edge_accessors.emplace_back( - msgs::Edge{vertex.Id(), std::move(edge.other_end), {}, {edge.gid}, edge.type}); + edge_accessors.emplace_back(msgs::Edge{vertex.Id(), std::move(edge.other_end), {}, {edge.gid}, edge.type}, + context.shard_request_manager); } break; } @@ -2527,12 +2573,13 @@ class DistributedExpandCursor : public Cursor { } return edge_accessors; }; + current_in_edges_ = convert_edges(std::move(result_row.in_edges_with_specific_properties), EdgeAtom::Direction::IN); current_in_edge_it_ = current_in_edges_.begin(); - current_in_edges_ = - convert_edges(std::move(result_row.in_edges_with_specific_properties), EdgeAtom::Direction::OUT); - current_in_edge_it_ = current_in_edges_.begin(); + current_out_edges_ = + convert_edges(std::move(result_row.out_edges_with_specific_properties), EdgeAtom::Direction::OUT); + current_out_edge_it_ = current_out_edges_.begin(); return true; } } @@ -2540,19 +2587,6 @@ class DistributedExpandCursor : public Cursor { bool Pull(Frame &frame, ExecutionContext &context) override { SCOPED_PROFILE_OP("DistributedExpand"); // A helper function for expanding a node from an edge. - auto pull_node = [this, &frame](const EdgeAccessor &new_edge, EdgeAtom::Direction direction) { - if (self_.common_.existing_node) return; - switch (direction) { - case EdgeAtom::Direction::IN: - frame[self_.common_.node_symbol] = new_edge.From(); - break; - case EdgeAtom::Direction::OUT: - frame[self_.common_.node_symbol] = new_edge.To(); - break; - case EdgeAtom::Direction::BOTH: - LOG_FATAL("Must indicate exact expansion direction here"); - } - }; while (true) { if (MustAbort(context)) throw HintedAbortError(); @@ -2561,7 +2595,7 @@ class DistributedExpandCursor : public Cursor { auto &edge = *current_in_edge_it_; ++current_in_edge_it_; frame[self_.common_.edge_symbol] = edge; - pull_node(edge, EdgeAtom::Direction::IN); + PullDstVertex(frame, context, EdgeAtom::Direction::IN); return true; } @@ -2573,7 +2607,7 @@ class DistributedExpandCursor : public Cursor { continue; }; frame[self_.common_.edge_symbol] = edge; - pull_node(edge, EdgeAtom::Direction::OUT); + PullDstVertex(frame, context, EdgeAtom::Direction::OUT); return true; } diff --git a/src/query/v2/requests.hpp b/src/query/v2/requests.hpp index 49ae346cb..2ea4efb57 100644 --- a/src/query/v2/requests.hpp +++ b/src/query/v2/requests.hpp @@ -387,7 +387,6 @@ struct GetPropertiesResponse { enum class EdgeDirection : uint8_t { OUT = 1, IN = 2, BOTH = 3 }; struct ExpandOneRequest { - // TODO(antaljanosbenjamin): Filtering based on the id of the other end of the edge? Hlc transaction_id; std::vector src_vertices; // return types that type is in this list diff --git a/src/query/v2/shard_request_manager.hpp b/src/query/v2/shard_request_manager.hpp index 79794aa1a..a73201046 100644 --- a/src/query/v2/shard_request_manager.hpp +++ b/src/query/v2/shard_request_manager.hpp @@ -171,6 +171,7 @@ class ShardRequestManager : public ShardRequestManagerInterface { if (hlc_response.fresher_shard_map) { shards_map_ = hlc_response.fresher_shard_map.value(); + SetUpNameIdMappers(); } } @@ -186,6 +187,7 @@ class ShardRequestManager : public ShardRequestManagerInterface { if (hlc_response.fresher_shard_map) { shards_map_ = hlc_response.fresher_shard_map.value(); + SetUpNameIdMappers(); } auto commit_timestamp = hlc_response.new_hlc; @@ -223,14 +225,14 @@ class ShardRequestManager : public ShardRequestManagerInterface { return shards_map_.GetLabelId(name).value(); } - const std::string &PropertyToName(memgraph::storage::v3::PropertyId prop) const override { - return shards_map_.GetPropertyName(prop); + const std::string &PropertyToName(memgraph::storage::v3::PropertyId id) const override { + return properties_.IdToName(id.AsUint()); } - const std::string &LabelToName(memgraph::storage::v3::LabelId label) const override { - return shards_map_.GetLabelName(label); + const std::string &LabelToName(memgraph::storage::v3::LabelId id) const override { + return labels_.IdToName(id.AsUint()); } - const std::string &EdgeTypeToName(memgraph::storage::v3::EdgeTypeId type) const override { - return shards_map_.GetEdgeTypeName(type); + const std::string &EdgeTypeToName(memgraph::storage::v3::EdgeTypeId id) const override { + return edge_types_.IdToName(id.AsUint()); } bool IsPrimaryKey(LabelId primary_label, PropertyId property) const override { @@ -358,7 +360,7 @@ class ShardRequestManager : public ShardRequestManagerInterface { std::vector accessors; for (auto &response : responses) { for (auto &result_row : response.results) { - accessors.emplace_back(VertexAccessor(std::move(result_row.vertex), std::move(result_row.props))); + accessors.emplace_back(VertexAccessor(std::move(result_row.vertex), std::move(result_row.props), this)); } } return accessors; @@ -697,7 +699,28 @@ class ShardRequestManager : public ShardRequestManagerInterface { } } + void SetUpNameIdMappers() { + std::unordered_map id_to_name; + for (const auto &[name, id] : shards_map_.labels) { + id_to_name.emplace(id.AsUint(), name); + } + labels_.StoreMapping(std::move(id_to_name)); + id_to_name.clear(); + for (const auto &[name, id] : shards_map_.properties) { + id_to_name.emplace(id.AsUint(), name); + } + properties_.StoreMapping(std::move(id_to_name)); + id_to_name.clear(); + for (const auto &[name, id] : shards_map_.edge_types) { + id_to_name.emplace(id.AsUint(), name); + } + edge_types_.StoreMapping(std::move(id_to_name)); + } + ShardMap shards_map_; + storage::v3::NameIdMapper properties_; + storage::v3::NameIdMapper edge_types_; + storage::v3::NameIdMapper labels_; CoordinatorClient coord_cli_; RsmStorageClientManager storage_cli_manager_; memgraph::io::Io io_; diff --git a/src/storage/v3/name_id_mapper.hpp b/src/storage/v3/name_id_mapper.hpp index 94ed62625..f241bca54 100644 --- a/src/storage/v3/name_id_mapper.hpp +++ b/src/storage/v3/name_id_mapper.hpp @@ -53,6 +53,10 @@ class NameIdMapper final { return it->second; } + const auto &GetIdToNameMap() const { return id_to_name_; } + + const auto &GetNameToIdMap() const { return name_to_id_; } + private: // Necessary for comparison with string_view nad string // https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2018/p0919r1.html diff --git a/src/storage/v3/shard_rsm.cpp b/src/storage/v3/shard_rsm.cpp index a875e0164..9a124b250 100644 --- a/src/storage/v3/shard_rsm.cpp +++ b/src/storage/v3/shard_rsm.cpp @@ -17,6 +17,7 @@ #include "parser/opencypher/parser.hpp" #include "query/v2/requests.hpp" +#include "storage/v2/view.hpp" #include "storage/v3/bindings/ast/ast.hpp" #include "storage/v3/bindings/cypher_main_visitor.hpp" #include "storage/v3/bindings/db_accessor.hpp" @@ -113,6 +114,24 @@ std::optional> CollectSpecificPropertiesFromAccessor return ret; } +std::optional> PrimaryKeysFromAccessor(const VertexAccessor &acc, View view, + const Schemas::Schema *schema) { + std::map ret; + auto props = acc.Properties(view); + auto maybe_pk = acc.PrimaryKey(view); + if (maybe_pk.HasError()) { + spdlog::debug("Encountered an error while trying to get vertex primary key."); + return std::nullopt; + } + auto &pk = maybe_pk.GetValue(); + MG_ASSERT(schema->second.size() == pk.size(), "PrimaryKey size does not match schema!"); + for (size_t i{0}; i < schema->second.size(); ++i) { + ret.emplace(schema->second[i].property_id, FromPropertyValueToValue(std::move(pk[i]))); + } + + return ret; +} + std::optional> CollectAllPropertiesFromAccessor(const VertexAccessor &acc, View view, const Schemas::Schema *schema) { std::map ret; @@ -129,17 +148,9 @@ std::optional> CollectAllPropertiesFromAccessor(cons }); properties.clear(); - // TODO(antaljanosbenjamin): Once the VertexAccessor::Properties returns also the primary keys, we can get rid of this - // code. - auto maybe_pk = acc.PrimaryKey(view); - if (maybe_pk.HasError()) { - spdlog::debug("Encountered an error while trying to get vertex primary key."); - } - - auto &pk = maybe_pk.GetValue(); - MG_ASSERT(schema->second.size() == pk.size(), "PrimaryKey size does not match schema!"); - for (size_t i{0}; i < schema->second.size(); ++i) { - ret.emplace(schema->second[i].property_id, FromPropertyValueToValue(std::move(pk[i]))); + auto pks = PrimaryKeysFromAccessor(acc, view, schema); + if (pks) { + ret.merge(*pks); } return ret; @@ -190,7 +201,9 @@ std::optional FillUpSourceVertex(const std::optional> FillUpSourceVertexProperties(const std::optional &v_acc, - const msgs::ExpandOneRequest &req) { + const msgs::ExpandOneRequest &req, + storage::v3::View view, + const Schemas::Schema *schema) { std::map src_vertex_properties; if (!req.src_vertex_properties) { @@ -204,6 +217,10 @@ std::optional> FillUpSourceVertexProperties(const st for (auto &[key, val] : props.GetValue()) { src_vertex_properties.insert(std::make_pair(key, FromPropertyValueToValue(std::move(val)))); } + auto pks = PrimaryKeysFromAccessor(*v_acc, view, schema); + if (pks) { + src_vertex_properties.merge(*pks); + } } else if (req.src_vertex_properties.value().empty()) { // NOOP @@ -264,7 +281,6 @@ std::optional, 2>> FillUpConnectingEdges( return std::nullopt; } in_edges = maybe_filter_based_on_edge_uniquness(std::move(in_edges_result.GetValue()), msgs::EdgeDirection::IN); - auto out_edges_result = v_acc->OutEdges(View::NEW, edge_types); if (out_edges_result.HasError()) { spdlog::debug("Encountered an error while trying to get out-going EdgeAccessors. Transaction id: {}", @@ -303,7 +319,8 @@ bool FillEdges(const std::vector &edges, msgs::ExpandOneResultRow std::optional GetExpandOneResult( Shard::Accessor &acc, msgs::VertexId src_vertex, const msgs::ExpandOneRequest &req, - const EdgeUniqunessFunction &maybe_filter_based_on_edge_uniquness, const EdgeFiller &edge_filler) { + const EdgeUniqunessFunction &maybe_filter_based_on_edge_uniquness, const EdgeFiller &edge_filler, + const Schemas::Schema *schema) { /// Fill up source vertex const auto primary_key = ConvertPropertyVector(std::move(src_vertex.second)); auto v_acc = acc.FindVertex(primary_key, View::NEW); @@ -312,9 +329,9 @@ std::optional GetExpandOneResult( if (!source_vertex) { return std::nullopt; } + std::optional> src_vertex_properties; + src_vertex_properties = FillUpSourceVertexProperties(v_acc, req, storage::v3::View::NEW, schema); - /// Fill up source vertex properties - auto src_vertex_properties = FillUpSourceVertexProperties(v_acc, req); if (!src_vertex_properties) { return std::nullopt; } @@ -852,7 +869,8 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::ExpandOneRequest &&req) { continue; } } - auto result = GetExpandOneResult(acc, src_vertex, req, maybe_filter_based_on_edge_uniquness, edge_filler); + auto result = GetExpandOneResult(acc, src_vertex, req, maybe_filter_based_on_edge_uniquness, edge_filler, + shard_->GetSchema(shard_->PrimaryLabel())); if (!result) { action_successful = false; diff --git a/src/utils/print_helpers.hpp b/src/utils/print_helpers.hpp index eeb18e061..05c07fdc2 100644 --- a/src/utils/print_helpers.hpp +++ b/src/utils/print_helpers.hpp @@ -33,8 +33,9 @@ std::ostream &operator<<(std::ostream &in, const std::vector &vector) { return in; } -template -std::ostream &operator<<(std::ostream &in, const std::map &map) { +namespace detail { +template +std::ostream &MapImpl(std::ostream &in, const T &map) { in << "{"; bool first = true; for (const auto &[a, b] : map) { @@ -49,6 +50,17 @@ std::ostream &operator<<(std::ostream &in, const std::map &map) { in << "}"; return in; } +} // namespace detail + +template +std::ostream &operator<<(std::ostream &in, const std::map &map) { + return detail::MapImpl(in, map); +} + +template +std::ostream &operator<<(std::ostream &in, const std::unordered_map &map) { + return detail::MapImpl(in, map); +} template std::ostream &operator<<(std::ostream &in, const std::unordered_map &map) { diff --git a/tests/e2e/distributed_queries/CMakeLists.txt b/tests/e2e/distributed_queries/CMakeLists.txt index 16dc84ab6..9a2c48c63 100644 --- a/tests/e2e/distributed_queries/CMakeLists.txt +++ b/tests/e2e/distributed_queries/CMakeLists.txt @@ -3,3 +3,8 @@ function(distributed_queries_e2e_python_files FILE_NAME) endfunction() distributed_queries_e2e_python_files(distributed_queries.py) +distributed_queries_e2e_python_files(unwind_collect.py) +distributed_queries_e2e_python_files(order_by_and_limit.py) +distributed_queries_e2e_python_files(distinct.py) +distributed_queries_e2e_python_files(optional_match.py) +distributed_queries_e2e_python_files(common.py) diff --git a/tests/e2e/distributed_queries/common.py b/tests/e2e/distributed_queries/common.py new file mode 100644 index 000000000..3588a803c --- /dev/null +++ b/tests/e2e/distributed_queries/common.py @@ -0,0 +1,44 @@ +# Copyright 2022 Memgraph Ltd. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +# License, and you may not use this file except in compliance with the Business Source License. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0, included in the file +# licenses/APL.txt. + +import typing +import mgclient +import sys +import pytest +import time + + +@pytest.fixture(autouse=True) +def connection(): + connection = connect() + return connection + + +def connect(**kwargs) -> mgclient.Connection: + connection = mgclient.connect(host="localhost", port=7687, **kwargs) + connection.autocommit = True + return connection + + +def execute_and_fetch_all(cursor: mgclient.Cursor, query: str, params: dict = {}) -> typing.List[tuple]: + cursor.execute(query, params) + return cursor.fetchall() + + +def has_n_result_row(cursor: mgclient.Cursor, query: str, n: int): + results = execute_and_fetch_all(cursor, query) + return len(results) == n + + +def wait_for_shard_manager_to_initialize(): + # The ShardManager in memgraph takes some time to initialize + # the shards, thus we cannot just run the queries right away + time.sleep(3) diff --git a/tests/e2e/distributed_queries/distinct.py b/tests/e2e/distributed_queries/distinct.py new file mode 100644 index 000000000..9ebe50e6f --- /dev/null +++ b/tests/e2e/distributed_queries/distinct.py @@ -0,0 +1,38 @@ +# Copyright 2022 Memgraph Ltd. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +# License, and you may not use this file except in compliance with the Business Source License. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0, included in the file +# licenses/APL.txt. + +import typing +import mgclient +import sys +import pytest +import time +from common import * + + +def test_distinct(connection): + wait_for_shard_manager_to_initialize() + cursor = connection.cursor() + + assert has_n_result_row(cursor, "CREATE (n :label {property:0})", 0) + assert has_n_result_row(cursor, "CREATE (n :label {property:1})", 0) + assert has_n_result_row(cursor, "MATCH (n), (m) CREATE (n)-[:TO]->(m)", 0) + assert has_n_result_row(cursor, "MATCH (n)-[r]->(m) RETURN r", 4) + + results = execute_and_fetch_all(cursor, "MATCH (n)-[r]->(m) RETURN DISTINCT m") + assert len(results) == 2 + for i, n in enumerate(results): + n_props = n[0].properties + assert len(n_props) == 1 + assert n_props["property"] == i + + +if __name__ == "__main__": + sys.exit(pytest.main([__file__, "-rA"])) diff --git a/tests/e2e/distributed_queries/distributed_queries.py b/tests/e2e/distributed_queries/distributed_queries.py index 68ebdef8d..b4b6324d9 100644 --- a/tests/e2e/distributed_queries/distributed_queries.py +++ b/tests/e2e/distributed_queries/distributed_queries.py @@ -14,34 +14,7 @@ import mgclient import sys import pytest import time - - -@pytest.fixture(autouse=True) -def connection(): - connection = connect() - return connection - - -def connect(**kwargs) -> mgclient.Connection: - connection = mgclient.connect(host="localhost", port=7687, **kwargs) - connection.autocommit = True - return connection - - -def execute_and_fetch_all(cursor: mgclient.Cursor, query: str, params: dict = {}) -> typing.List[tuple]: - cursor.execute(query, params) - return cursor.fetchall() - - -def has_n_result_row(cursor: mgclient.Cursor, query: str, n: int): - results = execute_and_fetch_all(cursor, query) - return len(results) == n - - -def wait_for_shard_manager_to_initialize(): - # The ShardManager in memgraph takes some time to initialize - # the shards, thus we cannot just run the queries right away - time.sleep(3) +from common import * def test_vertex_creation_and_scanall(connection): @@ -62,7 +35,7 @@ def test_vertex_creation_and_scanall(connection): assert len(results) == 9 for (n, r, m) in results: n_props = n.properties - assert len(n_props) == 0, "n is not expected to have properties, update the test!" + assert len(n_props) == 1, "n is not expected to have properties, update the test!" assert len(n.labels) == 0, "n is not expected to have labels, update the test!" assert r.type == "TO" diff --git a/tests/e2e/distributed_queries/optional_match.py b/tests/e2e/distributed_queries/optional_match.py new file mode 100644 index 000000000..731c36181 --- /dev/null +++ b/tests/e2e/distributed_queries/optional_match.py @@ -0,0 +1,37 @@ +# Copyright 2022 Memgraph Ltd. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +# License, and you may not use this file except in compliance with the Business Source License. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0, included in the file +# licenses/APL.txt. + +import typing +import mgclient +import sys +import pytest +import time +from common import * + + +def test_optional_match(connection): + wait_for_shard_manager_to_initialize() + cursor = connection.cursor() + + assert has_n_result_row(cursor, "CREATE (n :label {property:0})", 0) + + results = execute_and_fetch_all( + cursor, "MATCH (n:label) OPTIONAL MATCH (n:label)-[:TO]->(parent:label) RETURN parent" + ) + assert len(results) == 1 + + assert has_n_result_row(cursor, "CREATE (n :label {property:2})", 0) + assert has_n_result_row(cursor, "MATCH (n), (m) CREATE (n)-[:TO]->(m)", 0) + assert has_n_result_row(cursor, "MATCH (n:label) OPTIONAL MATCH (n)-[r:TO]->(m:label) RETURN r", 4) + + +if __name__ == "__main__": + sys.exit(pytest.main([__file__, "-rA"])) diff --git a/tests/e2e/distributed_queries/order_by_and_limit.py b/tests/e2e/distributed_queries/order_by_and_limit.py new file mode 100644 index 000000000..05297f8f6 --- /dev/null +++ b/tests/e2e/distributed_queries/order_by_and_limit.py @@ -0,0 +1,44 @@ +# Copyright 2022 Memgraph Ltd. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +# License, and you may not use this file except in compliance with the Business Source License. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0, included in the file +# licenses/APL.txt. + +import typing +import mgclient +import sys +import pytest +import time +from common import * + + +def test_order_by_and_limit(connection): + wait_for_shard_manager_to_initialize() + cursor = connection.cursor() + + assert has_n_result_row(cursor, "CREATE (n :label {property:1})", 0) + assert has_n_result_row(cursor, "CREATE (n :label {property:2})", 0) + assert has_n_result_row(cursor, "CREATE (n :label {property:3})", 0) + assert has_n_result_row(cursor, "CREATE (n :label {property:4})", 0) + + results = execute_and_fetch_all(cursor, "MATCH (n) RETURN n ORDER BY n.property DESC") + assert len(results) == 4 + i = 4 + for n in results: + n_props = n[0].properties + assert len(n_props) == 1 + assert n_props["property"] == i + i = i - 1 + + result = execute_and_fetch_all(cursor, "MATCH (n) RETURN n ORDER BY n.property LIMIT 1") + assert len(result) == 1 + assert result[0][0].properties["property"] == 1 + + +if __name__ == "__main__": + sys.exit(pytest.main([__file__, "-rA"])) diff --git a/tests/e2e/distributed_queries/unwind_collect.py b/tests/e2e/distributed_queries/unwind_collect.py new file mode 100644 index 000000000..b51ac9907 --- /dev/null +++ b/tests/e2e/distributed_queries/unwind_collect.py @@ -0,0 +1,34 @@ +# Copyright 2022 Memgraph Ltd. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +# License, and you may not use this file except in compliance with the Business Source License. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0, included in the file +# licenses/APL.txt. + +import typing +import mgclient +import sys +import pytest +import time +from common import * + + +def test_collect_unwind(connection): + wait_for_shard_manager_to_initialize() + cursor = connection.cursor() + + assert has_n_result_row(cursor, "CREATE (n :label {property:1})", 0) + assert has_n_result_row(cursor, "CREATE (n :label {property:2})", 0) + assert has_n_result_row(cursor, "CREATE (n :label {property:3})", 0) + assert has_n_result_row(cursor, "CREATE (n :label {property:4})", 0) + + assert has_n_result_row(cursor, "MATCH (n) WITH collect(n) AS result RETURN result", 1) + assert has_n_result_row(cursor, "MATCH (n) WITH collect(n) AS nd UNWIND nd AS result RETURN result", 4) + + +if __name__ == "__main__": + sys.exit(pytest.main([__file__, "-rA"])) diff --git a/tests/e2e/distributed_queries/workloads.yaml b/tests/e2e/distributed_queries/workloads.yaml index 083fb91ca..c4ba13500 100644 --- a/tests/e2e/distributed_queries/workloads.yaml +++ b/tests/e2e/distributed_queries/workloads.yaml @@ -11,3 +11,23 @@ workloads: binary: "tests/e2e/pytest_runner.sh" args: ["distributed_queries/distributed_queries.py"] <<: *template_cluster + + - name: "Distributed unwind collect" + binary: "tests/e2e/pytest_runner.sh" + args: ["distributed_queries/unwind_collect.py"] + <<: *template_cluster + + - name: "Distributed order by and limit" + binary: "tests/e2e/pytest_runner.sh" + args: ["distributed_queries/order_by_and_limit.py"] + <<: *template_cluster + + - name: "Distributed distinct" + binary: "tests/e2e/pytest_runner.sh" + args: ["distributed_queries/distinct.py"] + <<: *template_cluster + + - name: "Distributed optional match" + binary: "tests/e2e/pytest_runner.sh" + args: ["distributed_queries/optional_match.py"] + <<: *template_cluster diff --git a/tests/setup.sh b/tests/setup.sh index a74ee6ff9..4b69bbf1c 100755 --- a/tests/setup.sh +++ b/tests/setup.sh @@ -12,7 +12,7 @@ PIP_DEPS=( "neo4j-driver==4.1.1" "parse==1.18.0" "parse-type==0.5.2" - "pytest==6.2.3" + "pytest==6.2.5" "pyyaml==5.4.1" "six==1.15.0" ) @@ -36,12 +36,12 @@ PYTHON_MINOR=$(python3 -c 'import sys; print(sys.version_info[:][1])') # NOTE (2021-11-15): PyPi doesn't contain pulsar-client for Python 3.9 so we have to use # our manually built wheel file. When they update the repository, pulsar-client can be # added as a regular PIP dependancy -if [ $PYTHON_MINOR -lt 9 ]; then - pip --timeout 1000 install "pulsar-client==2.8.1" -else - pip --timeout 1000 install https://s3-eu-west-1.amazonaws.com/deps.memgraph.io/pulsar_client-2.8.1-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.whl -fi - +#if [ $PYTHON_MINOR -lt 9 ]; then +# pip --timeout 1000 install "pulsar-client==2.8.1" +#else +# pip --timeout 1000 install https://s3-eu-west-1.amazonaws.com/deps.memgraph.io/pulsar_client-2.8.1-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.whl +#fi +# for pkg in "${PIP_DEPS[@]}"; do pip --timeout 1000 install "$pkg" done