diff --git a/src/glue/v2/communication.cpp b/src/glue/v2/communication.cpp index 3406105b0..42228652f 100644 --- a/src/glue/v2/communication.cpp +++ b/src/glue/v2/communication.cpp @@ -18,8 +18,8 @@ #include "common/errors.hpp" #include "coordinator/shard_map.hpp" #include "query/v2/accessors.hpp" +#include "query/v2/request_router.hpp" #include "query/v2/requests.hpp" -#include "query/v2/shard_request_manager.hpp" #include "storage/v3/edge_accessor.hpp" #include "storage/v3/id_types.hpp" #include "storage/v3/shard.hpp" @@ -72,7 +72,7 @@ query::v2::TypedValue ToTypedValue(const Value &value) { } communication::bolt::Vertex ToBoltVertex(const query::v2::accessors::VertexAccessor &vertex, - const query::v2::ShardRequestManagerInterface *shard_request_manager, + const query::v2::RequestRouterInterface *request_router, storage::v3::View /*view*/) { auto id = communication::bolt::Id::FromUint(0); @@ -80,44 +80,44 @@ communication::bolt::Vertex ToBoltVertex(const query::v2::accessors::VertexAcces std::vector new_labels; new_labels.reserve(labels.size()); for (const auto &label : labels) { - new_labels.push_back(shard_request_manager->LabelToName(label.id)); + new_labels.push_back(request_router->LabelToName(label.id)); } auto properties = vertex.Properties(); std::map new_properties; for (const auto &[prop, property_value] : properties) { - new_properties[shard_request_manager->PropertyToName(prop)] = ToBoltValue(property_value); + new_properties[request_router->PropertyToName(prop)] = ToBoltValue(property_value); } return communication::bolt::Vertex{id, new_labels, new_properties}; } communication::bolt::Edge ToBoltEdge(const query::v2::accessors::EdgeAccessor &edge, - const query::v2::ShardRequestManagerInterface *shard_request_manager, + const query::v2::RequestRouterInterface *request_router, storage::v3::View /*view*/) { // TODO(jbajic) Fix bolt communication auto id = communication::bolt::Id::FromUint(0); auto from = communication::bolt::Id::FromUint(0); auto to = communication::bolt::Id::FromUint(0); - const auto &type = shard_request_manager->EdgeTypeToName(edge.EdgeType()); + const auto &type = request_router->EdgeTypeToName(edge.EdgeType()); auto properties = edge.Properties(); std::map new_properties; for (const auto &[prop, property_value] : properties) { - new_properties[shard_request_manager->PropertyToName(prop)] = ToBoltValue(property_value); + new_properties[request_router->PropertyToName(prop)] = ToBoltValue(property_value); } return communication::bolt::Edge{id, from, to, type, new_properties}; } communication::bolt::Path ToBoltPath(const query::v2::accessors::Path & /*edge*/, - const query::v2::ShardRequestManagerInterface * /*shard_request_manager*/, + const query::v2::RequestRouterInterface * /*request_router*/, storage::v3::View /*view*/) { // TODO(jbajic) Fix bolt communication MG_ASSERT(false, "Path is unimplemented!"); return {}; } -Value ToBoltValue(const query::v2::TypedValue &value, - const query::v2::ShardRequestManagerInterface *shard_request_manager, storage::v3::View view) { +Value ToBoltValue(const query::v2::TypedValue &value, const query::v2::RequestRouterInterface *request_router, + storage::v3::View view) { switch (value.type()) { case query::v2::TypedValue::Type::Null: return {}; @@ -133,7 +133,7 @@ Value ToBoltValue(const query::v2::TypedValue &value, std::vector values; values.reserve(value.ValueList().size()); for (const auto &v : value.ValueList()) { - auto value = ToBoltValue(v, shard_request_manager, view); + auto value = ToBoltValue(v, request_router, view); values.emplace_back(std::move(value)); } return {std::move(values)}; @@ -141,21 +141,21 @@ Value ToBoltValue(const query::v2::TypedValue &value, case query::v2::TypedValue::Type::Map: { std::map map; for (const auto &kv : value.ValueMap()) { - auto value = ToBoltValue(kv.second, shard_request_manager, view); + auto value = ToBoltValue(kv.second, request_router, view); map.emplace(kv.first, std::move(value)); } return {std::move(map)}; } case query::v2::TypedValue::Type::Vertex: { - auto vertex = ToBoltVertex(value.ValueVertex(), shard_request_manager, view); + auto vertex = ToBoltVertex(value.ValueVertex(), request_router, view); return {std::move(vertex)}; } case query::v2::TypedValue::Type::Edge: { - auto edge = ToBoltEdge(value.ValueEdge(), shard_request_manager, view); + auto edge = ToBoltEdge(value.ValueEdge(), request_router, view); return {std::move(edge)}; } case query::v2::TypedValue::Type::Path: { - auto path = ToBoltPath(value.ValuePath(), shard_request_manager, view); + auto path = ToBoltPath(value.ValuePath(), request_router, view); return {std::move(path)}; } case query::v2::TypedValue::Type::Date: diff --git a/src/glue/v2/communication.hpp b/src/glue/v2/communication.hpp index 817ff02bd..c1661b521 100644 --- a/src/glue/v2/communication.hpp +++ b/src/glue/v2/communication.hpp @@ -15,7 +15,7 @@ #include "communication/bolt/v1/value.hpp" #include "coordinator/shard_map.hpp" #include "query/v2/bindings/typed_value.hpp" -#include "query/v2/shard_request_manager.hpp" +#include "query/v2/request_router.hpp" #include "storage/v3/property_value.hpp" #include "storage/v3/result.hpp" #include "storage/v3/shard.hpp" @@ -32,40 +32,37 @@ namespace memgraph::glue::v2 { /// @param storage::v3::VertexAccessor for converting to /// communication::bolt::Vertex. -/// @param query::v2::ShardRequestManagerInterface *shard_request_manager getting label and property names. +/// @param query::v2::RequestRouterInterface *request_router getting label and property names. /// @param storage::v3::View for deciding which vertex attributes are visible. /// /// @throw std::bad_alloc communication::bolt::Vertex ToBoltVertex(const storage::v3::VertexAccessor &vertex, - const query::v2::ShardRequestManagerInterface *shard_request_manager, + const query::v2::RequestRouterInterface *request_router, storage::v3::View view); /// @param storage::v3::EdgeAccessor for converting to communication::bolt::Edge. -/// @param query::v2::ShardRequestManagerInterface *shard_request_manager getting edge type and property names. +/// @param query::v2::RequestRouterInterface *request_router getting edge type and property names. /// @param storage::v3::View for deciding which edge attributes are visible. /// /// @throw std::bad_alloc communication::bolt::Edge ToBoltEdge(const storage::v3::EdgeAccessor &edge, - const query::v2::ShardRequestManagerInterface *shard_request_manager, - storage::v3::View view); + const query::v2::RequestRouterInterface *request_router, storage::v3::View view); /// @param query::v2::Path for converting to communication::bolt::Path. -/// @param query::v2::ShardRequestManagerInterface *shard_request_manager ToBoltVertex and ToBoltEdge. +/// @param query::v2::RequestRouterInterface *request_router ToBoltVertex and ToBoltEdge. /// @param storage::v3::View for ToBoltVertex and ToBoltEdge. /// /// @throw std::bad_alloc communication::bolt::Path ToBoltPath(const query::v2::accessors::Path &path, - const query::v2::ShardRequestManagerInterface *shard_request_manager, - storage::v3::View view); + const query::v2::RequestRouterInterface *request_router, storage::v3::View view); /// @param query::v2::TypedValue for converting to communication::bolt::Value. -/// @param query::v2::ShardRequestManagerInterface *shard_request_manager ToBoltVertex and ToBoltEdge. +/// @param query::v2::RequestRouterInterface *request_router ToBoltVertex and ToBoltEdge. /// @param storage::v3::View for ToBoltVertex and ToBoltEdge. /// /// @throw std::bad_alloc communication::bolt::Value ToBoltValue(const query::v2::TypedValue &value, - const query::v2::ShardRequestManagerInterface *shard_request_manager, - storage::v3::View view); + const query::v2::RequestRouterInterface *request_router, storage::v3::View view); query::v2::TypedValue ToTypedValue(const communication::bolt::Value &value); @@ -75,8 +72,7 @@ storage::v3::PropertyValue ToPropertyValue(const communication::bolt::Value &val communication::bolt::Value ToBoltValue(msgs::Value value); -communication::bolt::Value ToBoltValue(msgs::Value value, - const query::v2::ShardRequestManagerInterface *shard_request_manager, +communication::bolt::Value ToBoltValue(msgs::Value value, const query::v2::RequestRouterInterface *request_router, storage::v3::View view); } // namespace memgraph::glue::v2 diff --git a/src/memgraph.cpp b/src/memgraph.cpp index 565f9940c..d825cc0e7 100644 --- a/src/memgraph.cpp +++ b/src/memgraph.cpp @@ -454,7 +454,7 @@ class BoltSession final : public memgraph::communication::bolt::Session Pull(TEncoder *encoder, std::optional n, std::optional qid) override { - TypedValueResultStream stream(encoder, interpreter_.GetShardRequestManager()); + TypedValueResultStream stream(encoder, interpreter_.GetRequestRouter()); return PullResults(stream, n, qid); } @@ -481,7 +481,7 @@ class BoltSession final : public memgraph::communication::bolt::Session decoded_summary; for (const auto &kv : summary) { - auto bolt_value = memgraph::glue::v2::ToBoltValue(kv.second, interpreter_.GetShardRequestManager(), + auto bolt_value = memgraph::glue::v2::ToBoltValue(kv.second, interpreter_.GetRequestRouter(), memgraph::storage::v3::View::NEW); decoded_summary.emplace(kv.first, std::move(bolt_value)); } @@ -497,15 +497,14 @@ class BoltSession final : public memgraph::communication::bolt::Session &values) { std::vector decoded_values; decoded_values.reserve(values.size()); for (const auto &v : values) { - auto bolt_value = memgraph::glue::v2::ToBoltValue(v, shard_request_manager_, memgraph::storage::v3::View::NEW); + auto bolt_value = memgraph::glue::v2::ToBoltValue(v, request_router_, memgraph::storage::v3::View::NEW); decoded_values.emplace_back(std::move(bolt_value)); } encoder_->MessageRecord(decoded_values); @@ -513,7 +512,7 @@ class BoltSession final : public memgraph::communication::bolt::Session> &EdgeAccessor::Properties() const { return edge.properties; } Value EdgeAccessor::GetProperty(const std::string &prop_name) const { - auto maybe_prop = manager_->MaybeNameToProperty(prop_name); + auto maybe_prop = request_router_->MaybeNameToProperty(prop_name); if (!maybe_prop) { return {}; } @@ -39,20 +39,20 @@ bool EdgeAccessor::IsCycle() const { return edge.src == edge.dst; }; size_t EdgeAccessor::CypherId() const { return edge.id.gid; } VertexAccessor EdgeAccessor::To() const { - return VertexAccessor(Vertex{edge.dst}, std::vector>{}, manager_); + return VertexAccessor(Vertex{edge.dst}, std::vector>{}, request_router_); } VertexAccessor EdgeAccessor::From() const { - return VertexAccessor(Vertex{edge.src}, std::vector>{}, manager_); + return VertexAccessor(Vertex{edge.src}, std::vector>{}, request_router_); } VertexAccessor::VertexAccessor(Vertex v, std::vector> props, - const ShardRequestManagerInterface *manager) - : vertex(std::move(v)), properties(std::move(props)), manager_(manager) {} + const RequestRouterInterface *request_router) + : vertex(std::move(v)), properties(std::move(props)), request_router_(request_router) {} VertexAccessor::VertexAccessor(Vertex v, std::map &&props, - const ShardRequestManagerInterface *manager) - : vertex(std::move(v)), manager_(manager) { + const RequestRouterInterface *request_router) + : vertex(std::move(v)), request_router_(request_router) { properties.reserve(props.size()); for (auto &[id, value] : props) { properties.emplace_back(std::make_pair(id, std::move(value))); @@ -60,8 +60,8 @@ VertexAccessor::VertexAccessor(Vertex v, std::map &&props, } VertexAccessor::VertexAccessor(Vertex v, const std::map &props, - const ShardRequestManagerInterface *manager) - : vertex(std::move(v)), manager_(manager) { + const RequestRouterInterface *request_router) + : vertex(std::move(v)), request_router_(request_router) { properties.reserve(props.size()); for (const auto &[id, value] : props) { properties.emplace_back(std::make_pair(id, value)); @@ -91,7 +91,7 @@ Value VertexAccessor::GetProperty(PropertyId prop_id) const { // NOLINTNEXTLINE(readability-convert-member-functions-to-static) Value VertexAccessor::GetProperty(const std::string &prop_name) const { - auto maybe_prop = manager_->MaybeNameToProperty(prop_name); + auto maybe_prop = request_router_->MaybeNameToProperty(prop_name); if (!maybe_prop) { return {}; } diff --git a/src/query/v2/accessors.hpp b/src/query/v2/accessors.hpp index 48578d217..221c9ffc4 100644 --- a/src/query/v2/accessors.hpp +++ b/src/query/v2/accessors.hpp @@ -25,7 +25,7 @@ #include "utils/memory_tracker.hpp" namespace memgraph::query::v2 { -class ShardRequestManagerInterface; +class RequestRouterInterface; } // namespace memgraph::query::v2 namespace memgraph::query::v2::accessors { @@ -41,7 +41,7 @@ class VertexAccessor; class EdgeAccessor final { public: - explicit EdgeAccessor(Edge edge, const ShardRequestManagerInterface *manager); + explicit EdgeAccessor(Edge edge, const RequestRouterInterface *request_router); [[nodiscard]] EdgeTypeId EdgeType() const; @@ -64,7 +64,7 @@ class EdgeAccessor final { private: Edge edge; - const ShardRequestManagerInterface *manager_; + const RequestRouterInterface *request_router_; }; class VertexAccessor final { @@ -73,10 +73,10 @@ class VertexAccessor final { using Label = msgs::Label; using VertexId = msgs::VertexId; VertexAccessor(Vertex v, std::vector> props, - const ShardRequestManagerInterface *manager); + const RequestRouterInterface *request_router); - VertexAccessor(Vertex v, std::map &&props, const ShardRequestManagerInterface *manager); - VertexAccessor(Vertex v, const std::map &props, const ShardRequestManagerInterface *manager); + VertexAccessor(Vertex v, std::map &&props, const RequestRouterInterface *request_router); + VertexAccessor(Vertex v, const std::map &props, const RequestRouterInterface *request_router); [[nodiscard]] Label PrimaryLabel() const; @@ -108,7 +108,7 @@ class VertexAccessor final { private: Vertex vertex; std::vector> properties; - const ShardRequestManagerInterface *manager_; + const RequestRouterInterface *request_router_; }; // Highly mocked interface. Won't work if used. diff --git a/src/query/v2/bindings/eval.hpp b/src/query/v2/bindings/eval.hpp index 02e27b22a..0a0d3ca0a 100644 --- a/src/query/v2/bindings/eval.hpp +++ b/src/query/v2/bindings/eval.hpp @@ -26,7 +26,7 @@ namespace memgraph::query::v2 { -class ShardRequestManagerInterface; +class RequestRouterInterface; namespace detail { class Callable { @@ -34,15 +34,14 @@ class Callable { auto operator()(const storage::v3::PropertyValue &val) const { return storage::v3::PropertyToTypedValue(val); }; - auto operator()(const msgs::Value &val, ShardRequestManagerInterface *manager) const { - return ValueToTypedValue(val, manager); + auto operator()(const msgs::Value &val, RequestRouterInterface *request_router) const { + return ValueToTypedValue(val, request_router); }; }; } // namespace detail -using ExpressionEvaluator = - expr::ExpressionEvaluator; +using ExpressionEvaluator = expr::ExpressionEvaluator; } // namespace memgraph::query::v2 diff --git a/src/query/v2/context.hpp b/src/query/v2/context.hpp index 388342349..cb30a9ced 100644 --- a/src/query/v2/context.hpp +++ b/src/query/v2/context.hpp @@ -20,7 +20,7 @@ #include "query/v2/parameters.hpp" #include "query/v2/plan/profile.hpp" //#include "query/v2/trigger.hpp" -#include "query/v2/shard_request_manager.hpp" +#include "query/v2/request_router.hpp" #include "utils/async_timer.hpp" namespace memgraph::query::v2 { @@ -61,26 +61,26 @@ struct EvaluationContext { }; inline std::vector NamesToProperties(const std::vector &property_names, - ShardRequestManagerInterface *shard_request_manager) { + RequestRouterInterface *request_router) { std::vector properties; // TODO Fix by using reference properties.reserve(property_names.size()); - if (shard_request_manager != nullptr) { + if (request_router != nullptr) { for (const auto &name : property_names) { - properties.push_back(shard_request_manager->NameToProperty(name)); + properties.push_back(request_router->NameToProperty(name)); } } return properties; } inline std::vector NamesToLabels(const std::vector &label_names, - ShardRequestManagerInterface *shard_request_manager) { + RequestRouterInterface *request_router) { std::vector labels; labels.reserve(label_names.size()); // TODO Fix by using reference - if (shard_request_manager != nullptr) { + if (request_router != nullptr) { for (const auto &name : label_names) { - labels.push_back(shard_request_manager->NameToLabel(name)); + labels.push_back(request_router->NameToLabel(name)); } } return labels; @@ -97,7 +97,7 @@ struct ExecutionContext { plan::ProfilingStats *stats_root{nullptr}; ExecutionStats execution_stats; utils::AsyncTimer timer; - ShardRequestManagerInterface *shard_request_manager{nullptr}; + RequestRouterInterface *request_router{nullptr}; IdAllocator *edge_ids_alloc; }; diff --git a/src/query/v2/conversions.hpp b/src/query/v2/conversions.hpp index a1db5ed17..6db972cda 100644 --- a/src/query/v2/conversions.hpp +++ b/src/query/v2/conversions.hpp @@ -12,12 +12,12 @@ #pragma once #include "bindings/typed_value.hpp" #include "query/v2/accessors.hpp" +#include "query/v2/request_router.hpp" #include "query/v2/requests.hpp" -#include "query/v2/shard_request_manager.hpp" namespace memgraph::query::v2 { -inline TypedValue ValueToTypedValue(const msgs::Value &value, ShardRequestManagerInterface *manager) { +inline TypedValue ValueToTypedValue(const msgs::Value &value, RequestRouterInterface *request_router) { using Value = msgs::Value; switch (value.type) { case Value::Type::Null: @@ -35,7 +35,7 @@ inline TypedValue ValueToTypedValue(const msgs::Value &value, ShardRequestManage std::vector dst; dst.reserve(lst.size()); for (const auto &elem : lst) { - dst.push_back(ValueToTypedValue(elem, manager)); + dst.push_back(ValueToTypedValue(elem, request_router)); } return TypedValue(std::move(dst)); } @@ -43,21 +43,21 @@ inline TypedValue ValueToTypedValue(const msgs::Value &value, ShardRequestManage const auto &value_map = value.map_v; std::map dst; for (const auto &[key, val] : value_map) { - dst[key] = ValueToTypedValue(val, manager); + dst[key] = ValueToTypedValue(val, request_router); } return TypedValue(std::move(dst)); } case Value::Type::Vertex: return TypedValue(accessors::VertexAccessor( - value.vertex_v, std::vector>{}, manager)); + value.vertex_v, std::vector>{}, request_router)); case Value::Type::Edge: - return TypedValue(accessors::EdgeAccessor(value.edge_v, manager)); + return TypedValue(accessors::EdgeAccessor(value.edge_v, request_router)); } throw std::runtime_error("Incorrect type in conversion"); } -inline const auto ValueToTypedValueFunctor = [](const msgs::Value &value, ShardRequestManagerInterface *manager) { - return ValueToTypedValue(value, manager); +inline const auto ValueToTypedValueFunctor = [](const msgs::Value &value, RequestRouterInterface *request_router) { + return ValueToTypedValue(value, request_router); }; inline msgs::Value TypedValueToValue(const TypedValue &value) { diff --git a/src/query/v2/cypher_query_interpreter.cpp b/src/query/v2/cypher_query_interpreter.cpp index 908ee36cb..f3f8e48d7 100644 --- a/src/query/v2/cypher_query_interpreter.cpp +++ b/src/query/v2/cypher_query_interpreter.cpp @@ -11,7 +11,7 @@ #include "query/v2/cypher_query_interpreter.hpp" #include "query/v2/bindings/symbol_generator.hpp" -#include "query/v2/shard_request_manager.hpp" +#include "query/v2/request_router.hpp" // NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables) DEFINE_HIDDEN_bool(query_cost_planner, true, "Use the cost-estimating query planner."); @@ -118,9 +118,9 @@ ParsedQuery ParseQuery(const std::string &query_string, const std::map MakeLogicalPlan(AstStorage ast_storage, CypherQuery *query, const Parameters ¶meters, - ShardRequestManagerInterface *shard_manager, + RequestRouterInterface *request_router, const std::vector &predefined_identifiers) { - auto vertex_counts = plan::MakeVertexCountCache(shard_manager); + auto vertex_counts = plan::MakeVertexCountCache(request_router); auto symbol_table = expr::MakeSymbolTable(query, predefined_identifiers); auto planning_context = plan::MakePlanningContext(&ast_storage, &symbol_table, query, &vertex_counts); auto [root, cost] = plan::MakeLogicalPlan(&planning_context, parameters, FLAGS_query_cost_planner); @@ -130,7 +130,7 @@ std::unique_ptr MakeLogicalPlan(AstStorage ast_storage, CypherQuery std::shared_ptr CypherQueryToPlan(uint64_t hash, AstStorage ast_storage, CypherQuery *query, const Parameters ¶meters, utils::SkipList *plan_cache, - ShardRequestManagerInterface *shard_manager, + RequestRouterInterface *request_router, const std::vector &predefined_identifiers) { std::optional::Accessor> plan_cache_access; if (plan_cache) { @@ -146,7 +146,7 @@ std::shared_ptr CypherQueryToPlan(uint64_t hash, AstStorage ast_stor } auto plan = std::make_shared( - MakeLogicalPlan(std::move(ast_storage), query, parameters, shard_manager, predefined_identifiers)); + MakeLogicalPlan(std::move(ast_storage), query, parameters, request_router, predefined_identifiers)); if (plan_cache_access) { plan_cache_access->insert({hash, plan}); } diff --git a/src/query/v2/cypher_query_interpreter.hpp b/src/query/v2/cypher_query_interpreter.hpp index b7f63ab8f..688e52fed 100644 --- a/src/query/v2/cypher_query_interpreter.hpp +++ b/src/query/v2/cypher_query_interpreter.hpp @@ -132,7 +132,7 @@ class SingleNodeLogicalPlan final : public LogicalPlan { }; std::unique_ptr MakeLogicalPlan(AstStorage ast_storage, CypherQuery *query, const Parameters ¶meters, - ShardRequestManagerInterface *shard_manager, + RequestRouterInterface *request_router, const std::vector &predefined_identifiers); /** @@ -145,7 +145,7 @@ std::unique_ptr MakeLogicalPlan(AstStorage ast_storage, CypherQuery */ std::shared_ptr CypherQueryToPlan(uint64_t hash, AstStorage ast_storage, CypherQuery *query, const Parameters ¶meters, utils::SkipList *plan_cache, - ShardRequestManagerInterface *shard_manager, + RequestRouterInterface *request_router, const std::vector &predefined_identifiers = {}); } // namespace memgraph::query::v2 diff --git a/src/query/v2/frontend/ast/ast.lcp b/src/query/v2/frontend/ast/ast.lcp index dadcc9fa7..03c0c99f0 100644 --- a/src/query/v2/frontend/ast/ast.lcp +++ b/src/query/v2/frontend/ast/ast.lcp @@ -24,7 +24,7 @@ #include "query/v2/bindings/typed_value.hpp" #include "query/v2/db_accessor.hpp" #include "query/v2/path.hpp" -#include "query/v2/shard_request_manager.hpp" +#include "query/v2/request_router.hpp" #include "utils/typeinfo.hpp" #include "query/v2/conversions.hpp" @@ -838,14 +838,14 @@ cpp<# :slk-load (slk-load-ast-vector "Expression")) (function-name "std::string" :scope :public) (function "std::function &)>" + const functions::FunctionContext &)>" :scope :public :dont-save t :clone :copy :slk-load (lambda (member) #>cpp self->${member} = functions::NameToFunction, + functions::FunctionContext, functions::QueryEngineTag, decltype(ValueToTypedValueFunctor)>(self->function_name_); cpp<#))) (:public @@ -869,7 +869,7 @@ cpp<# const std::vector &arguments) : arguments_(arguments), function_name_(function_name), - function_(functions::NameToFunction, + function_(functions::NameToFunction, functions::QueryEngineTag, decltype(ValueToTypedValueFunctor)>(function_name_)) { if (!function_) { throw SemanticException("Function '{}' doesn't exist.", function_name); diff --git a/src/query/v2/interpret/awesome_memgraph_functions.hpp b/src/query/v2/interpret/awesome_memgraph_functions.hpp index 1fd351cd8..8ca3eacb3 100644 --- a/src/query/v2/interpret/awesome_memgraph_functions.hpp +++ b/src/query/v2/interpret/awesome_memgraph_functions.hpp @@ -22,7 +22,7 @@ namespace memgraph::query::v2 { -class ShardRequestManagerInterface; +class RequestRouterInterface; namespace { const char kStartsWith[] = "STARTSWITH"; @@ -32,9 +32,9 @@ const char kId[] = "ID"; } // namespace struct FunctionContext { - // TODO(kostasrim) consider optional here. ShardRequestManager does not exist on the storage. + // TODO(kostasrim) consider optional here. RequestRouter does not exist on the storage. // DbAccessor *db_accessor; - ShardRequestManagerInterface *manager; + RequestRouterInterface *request_router; utils::MemoryResource *memory; int64_t timestamp; std::unordered_map *counters; diff --git a/src/query/v2/interpreter.cpp b/src/query/v2/interpreter.cpp index aa4d39de3..05ad51a4b 100644 --- a/src/query/v2/interpreter.cpp +++ b/src/query/v2/interpreter.cpp @@ -45,7 +45,7 @@ #include "query/v2/plan/planner.hpp" #include "query/v2/plan/profile.hpp" #include "query/v2/plan/vertex_count_cache.hpp" -#include "query/v2/shard_request_manager.hpp" +#include "query/v2/request_router.hpp" #include "storage/v3/property_value.hpp" #include "storage/v3/shard.hpp" #include "utils/algorithm.hpp" @@ -144,7 +144,7 @@ class ReplQueryHandler final : public query::v2::ReplicationQueryHandler { /// @throw QueryRuntimeException if an error ocurred. Callback HandleAuthQuery(AuthQuery *auth_query, AuthQueryHandler *auth, const Parameters ¶meters, - ShardRequestManagerInterface *manager) { + RequestRouterInterface *request_router) { // Empty frame for evaluation of password expression. This is OK since // password should be either null or string literal and it's evaluation // should not depend on frame. @@ -155,7 +155,7 @@ Callback HandleAuthQuery(AuthQuery *auth_query, AuthQueryHandler *auth, const Pa // the argument to Callback. evaluation_context.timestamp = QueryTimestamp(); evaluation_context.parameters = parameters; - ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, manager, storage::v3::View::OLD); + ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, request_router, storage::v3::View::OLD); std::string username = auth_query->user_; std::string rolename = auth_query->role_; @@ -313,7 +313,7 @@ Callback HandleAuthQuery(AuthQuery *auth_query, AuthQueryHandler *auth, const Pa } Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters ¶meters, - InterpreterContext *interpreter_context, ShardRequestManagerInterface *manager, + InterpreterContext *interpreter_context, RequestRouterInterface *request_router, std::vector *notifications) { expr::Frame frame(0); SymbolTable symbol_table; @@ -322,7 +322,7 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters & // the argument to Callback. evaluation_context.timestamp = QueryTimestamp(); evaluation_context.parameters = parameters; - ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, manager, storage::v3::View::OLD); + ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, request_router, storage::v3::View::OLD); Callback callback; switch (repl_query->action_) { @@ -449,7 +449,7 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters & } Callback HandleSettingQuery(SettingQuery *setting_query, const Parameters ¶meters, - ShardRequestManagerInterface *manager) { + RequestRouterInterface *request_router) { expr::Frame frame(0); SymbolTable symbol_table; EvaluationContext evaluation_context; @@ -459,7 +459,7 @@ Callback HandleSettingQuery(SettingQuery *setting_query, const Parameters ¶m std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()) .count(); evaluation_context.parameters = parameters; - ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, manager, storage::v3::View::OLD); + ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, request_router, storage::v3::View::OLD); Callback callback; switch (setting_query->action_) { @@ -650,7 +650,7 @@ struct PullPlanVector { struct PullPlan { explicit PullPlan(std::shared_ptr plan, const Parameters ¶meters, bool is_profile_query, DbAccessor *dba, InterpreterContext *interpreter_context, utils::MemoryResource *execution_memory, - ShardRequestManagerInterface *shard_request_manager = nullptr, + RequestRouterInterface *request_router = nullptr, // TriggerContextCollector *trigger_context_collector = nullptr, std::optional memory_limit = {}); std::optional Pull(AnyStream *stream, std::optional n, @@ -684,7 +684,7 @@ struct PullPlan { PullPlan::PullPlan(const std::shared_ptr plan, const Parameters ¶meters, const bool is_profile_query, DbAccessor *dba, InterpreterContext *interpreter_context, utils::MemoryResource *execution_memory, - ShardRequestManagerInterface *shard_request_manager, const std::optional memory_limit) + RequestRouterInterface *request_router, const std::optional memory_limit) : plan_(plan), cursor_(plan->plan().MakeCursor(execution_memory)), frame_(plan->symbol_table().max_position(), execution_memory), @@ -694,14 +694,14 @@ PullPlan::PullPlan(const std::shared_ptr plan, const Parameters &par ctx_.symbol_table = plan->symbol_table(); ctx_.evaluation_context.timestamp = QueryTimestamp(); ctx_.evaluation_context.parameters = parameters; - ctx_.evaluation_context.properties = NamesToProperties(plan->ast_storage().properties_, shard_request_manager); - ctx_.evaluation_context.labels = NamesToLabels(plan->ast_storage().labels_, shard_request_manager); + ctx_.evaluation_context.properties = NamesToProperties(plan->ast_storage().properties_, request_router); + ctx_.evaluation_context.labels = NamesToLabels(plan->ast_storage().labels_, request_router); if (interpreter_context->config.execution_timeout_sec > 0) { ctx_.timer = utils::AsyncTimer{interpreter_context->config.execution_timeout_sec}; } ctx_.is_shutting_down = &interpreter_context->is_shutting_down; ctx_.is_profile_query = is_profile_query; - ctx_.shard_request_manager = shard_request_manager; + ctx_.request_router = request_router; ctx_.edge_ids_alloc = &interpreter_context->edge_ids_alloc; } @@ -917,7 +917,7 @@ Interpreter::Interpreter(InterpreterContext *interpreter_context) : interpreter_ auto random_uuid = boost::uuids::uuid{boost::uuids::random_generator()()}; auto query_io = interpreter_context_->io.ForkLocal(random_uuid); - shard_request_manager_ = std::make_unique>( + request_router_ = std::make_unique>( coordinator::CoordinatorClient( query_io, interpreter_context_->coordinator_address, std::vector{interpreter_context_->coordinator_address}), std::move(query_io)); @@ -994,7 +994,7 @@ PreparedQuery Interpreter::PrepareTransactionQuery(std::string_view query_upper) PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::map *summary, InterpreterContext *interpreter_context, DbAccessor *dba, utils::MemoryResource *execution_memory, std::vector *notifications, - ShardRequestManagerInterface *shard_request_manager) { + RequestRouterInterface *request_router) { // TriggerContextCollector *trigger_context_collector = nullptr) { auto *cypher_query = utils::Downcast(parsed_query.query); @@ -1003,8 +1003,7 @@ PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::mapmemory_limit_, cypher_query->memory_scale_); if (memory_limit) { @@ -1019,9 +1018,9 @@ PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::mapplan_cache : nullptr, shard_request_manager); + auto plan = CypherQueryToPlan(parsed_query.stripped_query.hash(), std::move(parsed_query.ast_storage), cypher_query, + parsed_query.parameters, + parsed_query.is_cacheable ? &interpreter_context->plan_cache : nullptr, request_router); summary->insert_or_assign("cost_estimate", plan->cost()); auto rw_type_checker = plan::ReadWriteTypeChecker(); @@ -1040,7 +1039,7 @@ PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::map(plan, parsed_query.parameters, false, dba, interpreter_context, - execution_memory, shard_request_manager, memory_limit); + execution_memory, request_router, memory_limit); // execution_memory, trigger_context_collector, memory_limit); return PreparedQuery{std::move(header), std::move(parsed_query.required_privileges), [pull_plan = std::move(pull_plan), output_symbols = std::move(output_symbols), summary]( @@ -1054,8 +1053,7 @@ PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::map *summary, - InterpreterContext *interpreter_context, - ShardRequestManagerInterface *shard_request_manager, + InterpreterContext *interpreter_context, RequestRouterInterface *request_router, utils::MemoryResource *execution_memory) { const std::string kExplainQueryStart = "explain "; MG_ASSERT(utils::StartsWith(utils::ToLowerCase(parsed_query.stripped_query.query()), kExplainQueryStart), @@ -1074,20 +1072,20 @@ PreparedQuery PrepareExplainQuery(ParsedQuery parsed_query, std::map(parsed_inner_query.query); MG_ASSERT(cypher_query, "Cypher grammar should not allow other queries in EXPLAIN"); - auto cypher_query_plan = CypherQueryToPlan( - parsed_inner_query.stripped_query.hash(), std::move(parsed_inner_query.ast_storage), cypher_query, - parsed_inner_query.parameters, parsed_inner_query.is_cacheable ? &interpreter_context->plan_cache : nullptr, - shard_request_manager); + auto cypher_query_plan = + CypherQueryToPlan(parsed_inner_query.stripped_query.hash(), std::move(parsed_inner_query.ast_storage), + cypher_query, parsed_inner_query.parameters, + parsed_inner_query.is_cacheable ? &interpreter_context->plan_cache : nullptr, request_router); std::stringstream printed_plan; - plan::PrettyPrint(*shard_request_manager, &cypher_query_plan->plan(), &printed_plan); + plan::PrettyPrint(*request_router, &cypher_query_plan->plan(), &printed_plan); std::vector> printed_plan_rows; for (const auto &row : utils::Split(utils::RTrim(printed_plan.str()), "\n")) { printed_plan_rows.push_back(std::vector{TypedValue(row)}); } - summary->insert_or_assign("explain", plan::PlanToJson(*shard_request_manager, &cypher_query_plan->plan()).dump()); + summary->insert_or_assign("explain", plan::PlanToJson(*request_router, &cypher_query_plan->plan()).dump()); return PreparedQuery{{"QUERY PLAN"}, std::move(parsed_query.required_privileges), @@ -1104,7 +1102,7 @@ PreparedQuery PrepareExplainQuery(ParsedQuery parsed_query, std::map *summary, InterpreterContext *interpreter_context, DbAccessor *dba, utils::MemoryResource *execution_memory, - ShardRequestManagerInterface *shard_request_manager = nullptr) { + RequestRouterInterface *request_router = nullptr) { const std::string kProfileQueryStart = "profile "; MG_ASSERT(utils::StartsWith(utils::ToLowerCase(parsed_query.stripped_query.query()), kProfileQueryStart), @@ -1147,22 +1145,21 @@ PreparedQuery PrepareProfileQuery(ParsedQuery parsed_query, bool in_explicit_tra EvaluationContext evaluation_context; evaluation_context.timestamp = QueryTimestamp(); evaluation_context.parameters = parsed_inner_query.parameters; - ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, shard_request_manager, - storage::v3::View::OLD); + ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, request_router, storage::v3::View::OLD); const auto memory_limit = expr::EvaluateMemoryLimit(&evaluator, cypher_query->memory_limit_, cypher_query->memory_scale_); - auto cypher_query_plan = CypherQueryToPlan( - parsed_inner_query.stripped_query.hash(), std::move(parsed_inner_query.ast_storage), cypher_query, - parsed_inner_query.parameters, parsed_inner_query.is_cacheable ? &interpreter_context->plan_cache : nullptr, - shard_request_manager); + auto cypher_query_plan = + CypherQueryToPlan(parsed_inner_query.stripped_query.hash(), std::move(parsed_inner_query.ast_storage), + cypher_query, parsed_inner_query.parameters, + parsed_inner_query.is_cacheable ? &interpreter_context->plan_cache : nullptr, request_router); auto rw_type_checker = plan::ReadWriteTypeChecker(); rw_type_checker.InferRWType(const_cast(cypher_query_plan->plan())); return PreparedQuery{{"OPERATOR", "ACTUAL HITS", "RELATIVE TIME", "ABSOLUTE TIME", "CUSTOM DATA"}, std::move(parsed_query.required_privileges), [plan = std::move(cypher_query_plan), parameters = std::move(parsed_inner_query.parameters), - summary, dba, interpreter_context, execution_memory, memory_limit, shard_request_manager, + summary, dba, interpreter_context, execution_memory, memory_limit, request_router, // We want to execute the query we are profiling lazily, so we delay // the construction of the corresponding context. stats_and_total_time = std::optional{}, @@ -1171,7 +1168,7 @@ PreparedQuery PrepareProfileQuery(ParsedQuery parsed_query, bool in_explicit_tra // No output symbols are given so that nothing is streamed. if (!stats_and_total_time) { stats_and_total_time = PullPlan(plan, parameters, true, dba, interpreter_context, - execution_memory, shard_request_manager, memory_limit) + execution_memory, request_router, memory_limit) .Pull(stream, {}, {}, summary); pull_plan = std::make_shared(ProfilingStatsToTable(*stats_and_total_time)); } @@ -1298,14 +1295,14 @@ PreparedQuery PrepareIndexQuery(ParsedQuery parsed_query, bool in_explicit_trans PreparedQuery PrepareAuthQuery(ParsedQuery parsed_query, bool in_explicit_transaction, std::map *summary, InterpreterContext *interpreter_context, DbAccessor *dba, utils::MemoryResource *execution_memory, - ShardRequestManagerInterface *manager) { + RequestRouterInterface *request_router) { if (in_explicit_transaction) { throw UserModificationInMulticommandTxException(); } auto *auth_query = utils::Downcast(parsed_query.query); - auto callback = HandleAuthQuery(auth_query, interpreter_context->auth, parsed_query.parameters, manager); + auto callback = HandleAuthQuery(auth_query, interpreter_context->auth, parsed_query.parameters, request_router); SymbolTable symbol_table; std::vector output_symbols; @@ -1334,14 +1331,14 @@ PreparedQuery PrepareAuthQuery(ParsedQuery parsed_query, bool in_explicit_transa PreparedQuery PrepareReplicationQuery(ParsedQuery parsed_query, const bool in_explicit_transaction, std::vector *notifications, InterpreterContext *interpreter_context, - ShardRequestManagerInterface *manager) { + RequestRouterInterface *request_router) { if (in_explicit_transaction) { throw ReplicationModificationInMulticommandTxException(); } auto *replication_query = utils::Downcast(parsed_query.query); - auto callback = - HandleReplicationQuery(replication_query, parsed_query.parameters, interpreter_context, manager, notifications); + auto callback = HandleReplicationQuery(replication_query, parsed_query.parameters, interpreter_context, + request_router, notifications); return PreparedQuery{callback.header, std::move(parsed_query.required_privileges), [callback_fn = std::move(callback.fn), pull_plan = std::shared_ptr{nullptr}]( @@ -1430,14 +1427,14 @@ PreparedQuery PrepareCreateSnapshotQuery(ParsedQuery parsed_query, bool in_expli } PreparedQuery PrepareSettingQuery(ParsedQuery parsed_query, const bool in_explicit_transaction, - ShardRequestManagerInterface *manager) { + RequestRouterInterface *request_router) { if (in_explicit_transaction) { throw SettingConfigInMulticommandTxException{}; } auto *setting_query = utils::Downcast(parsed_query.query); MG_ASSERT(setting_query); - auto callback = HandleSettingQuery(setting_query, parsed_query.parameters, manager); + auto callback = HandleSettingQuery(setting_query, parsed_query.parameters, request_router); return PreparedQuery{std::move(callback.header), std::move(parsed_query.required_privileges), [callback_fn = std::move(callback.fn), pull_plan = std::shared_ptr{nullptr}]( @@ -1634,7 +1631,7 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string, if (!in_explicit_transaction_ && (utils::Downcast(parsed_query.query) || utils::Downcast(parsed_query.query) || utils::Downcast(parsed_query.query))) { - shard_request_manager_->StartTransaction(); + request_router_->StartTransaction(); } utils::Timer planning_timer; @@ -1643,14 +1640,14 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string, if (utils::Downcast(parsed_query.query)) { prepared_query = PrepareCypherQuery(std::move(parsed_query), &query_execution->summary, interpreter_context_, &*execution_db_accessor_, &query_execution->execution_memory, - &query_execution->notifications, shard_request_manager_.get()); + &query_execution->notifications, request_router_.get()); } else if (utils::Downcast(parsed_query.query)) { prepared_query = PrepareExplainQuery(std::move(parsed_query), &query_execution->summary, interpreter_context_, - &*shard_request_manager_, &query_execution->execution_memory_with_exception); + &*request_router_, &query_execution->execution_memory_with_exception); } else if (utils::Downcast(parsed_query.query)) { - prepared_query = PrepareProfileQuery( - std::move(parsed_query), in_explicit_transaction_, &query_execution->summary, interpreter_context_, - &*execution_db_accessor_, &query_execution->execution_memory_with_exception, shard_request_manager_.get()); + prepared_query = PrepareProfileQuery(std::move(parsed_query), in_explicit_transaction_, &query_execution->summary, + interpreter_context_, &*execution_db_accessor_, + &query_execution->execution_memory_with_exception, request_router_.get()); } else if (utils::Downcast(parsed_query.query)) { prepared_query = PrepareDumpQuery(std::move(parsed_query), &query_execution->summary, &*execution_db_accessor_, &query_execution->execution_memory); @@ -1658,9 +1655,9 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string, prepared_query = PrepareIndexQuery(std::move(parsed_query), in_explicit_transaction_, &query_execution->notifications, interpreter_context_); } else if (utils::Downcast(parsed_query.query)) { - prepared_query = PrepareAuthQuery( - std::move(parsed_query), in_explicit_transaction_, &query_execution->summary, interpreter_context_, - &*execution_db_accessor_, &query_execution->execution_memory_with_exception, shard_request_manager_.get()); + prepared_query = PrepareAuthQuery(std::move(parsed_query), in_explicit_transaction_, &query_execution->summary, + interpreter_context_, &*execution_db_accessor_, + &query_execution->execution_memory_with_exception, request_router_.get()); } else if (utils::Downcast(parsed_query.query)) { prepared_query = PrepareInfoQuery(std::move(parsed_query), in_explicit_transaction_, &query_execution->summary, interpreter_context_, interpreter_context_->db, @@ -1671,7 +1668,7 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string, } else if (utils::Downcast(parsed_query.query)) { prepared_query = PrepareReplicationQuery(std::move(parsed_query), in_explicit_transaction_, &query_execution->notifications, - interpreter_context_, shard_request_manager_.get()); + interpreter_context_, request_router_.get()); } else if (utils::Downcast(parsed_query.query)) { prepared_query = PrepareLockPathQuery(std::move(parsed_query), in_explicit_transaction_, interpreter_context_, &*execution_db_accessor_); @@ -1688,8 +1685,7 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string, prepared_query = PrepareCreateSnapshotQuery(std::move(parsed_query), in_explicit_transaction_, interpreter_context_); } else if (utils::Downcast(parsed_query.query)) { - prepared_query = - PrepareSettingQuery(std::move(parsed_query), in_explicit_transaction_, shard_request_manager_.get()); + prepared_query = PrepareSettingQuery(std::move(parsed_query), in_explicit_transaction_, request_router_.get()); } else if (utils::Downcast(parsed_query.query)) { prepared_query = PrepareVersionQuery(std::move(parsed_query), in_explicit_transaction_); } else if (utils::Downcast(parsed_query.query)) { @@ -1730,7 +1726,7 @@ void Interpreter::Commit() { // For now, we will not check if there are some unfinished queries. // We should document clearly that all results should be pulled to complete // a query. - shard_request_manager_->Commit(); + request_router_->Commit(); if (!db_accessor_) return; const auto reset_necessary_members = [this]() { diff --git a/src/query/v2/interpreter.hpp b/src/query/v2/interpreter.hpp index afc298a0c..985c9a90c 100644 --- a/src/query/v2/interpreter.hpp +++ b/src/query/v2/interpreter.hpp @@ -296,7 +296,7 @@ class Interpreter final { */ void Abort(); - const ShardRequestManagerInterface *GetShardRequestManager() const { return shard_request_manager_.get(); } + const RequestRouterInterface *GetRequestRouter() const { return request_router_.get(); } private: struct QueryExecution { @@ -342,7 +342,7 @@ class Interpreter final { // move this unique_ptr into a shared_ptr. std::unique_ptr db_accessor_; std::optional execution_db_accessor_; - std::unique_ptr shard_request_manager_; + std::unique_ptr request_router_; bool in_explicit_transaction_{false}; bool expect_rollback_{false}; diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp index 81e466d82..b29ea1c47 100644 --- a/src/query/v2/plan/operator.cpp +++ b/src/query/v2/plan/operator.cpp @@ -39,8 +39,8 @@ #include "query/v2/frontend/ast/ast.hpp" #include "query/v2/path.hpp" #include "query/v2/plan/scoped_profile.hpp" +#include "query/v2/request_router.hpp" #include "query/v2/requests.hpp" -#include "query/v2/shard_request_manager.hpp" #include "storage/v3/conversions.hpp" #include "storage/v3/property_value.hpp" #include "utils/algorithm.hpp" @@ -177,10 +177,10 @@ class DistributedCreateNodeCursor : public Cursor { bool Pull(Frame &frame, ExecutionContext &context) override { SCOPED_PROFILE_OP("CreateNode"); if (input_cursor_->Pull(frame, context)) { - auto &shard_manager = context.shard_request_manager; + auto &request_router = context.request_router; { SCOPED_REQUEST_WAIT_PROFILE; - shard_manager->Request(state_, NodeCreationInfoToRequest(context, frame)); + request_router->Request(state_, NodeCreationInfoToRequest(context, frame)); } PlaceNodeOnTheFrame(frame, context); return true; @@ -197,8 +197,8 @@ class DistributedCreateNodeCursor : public Cursor { // TODO(kostasrim) Make this work with batching const auto primary_label = msgs::Label{.id = nodes_info_[0]->labels[0]}; msgs::Vertex v{.id = std::make_pair(primary_label, primary_keys_[0])}; - frame[nodes_info_.front()->symbol] = TypedValue( - query::v2::accessors::VertexAccessor(std::move(v), src_vertex_props_[0], context.shard_request_manager)); + frame[nodes_info_.front()->symbol] = + TypedValue(query::v2::accessors::VertexAccessor(std::move(v), src_vertex_props_[0], context.request_router)); } std::vector NodeCreationInfoToRequest(ExecutionContext &context, Frame &frame) { @@ -218,7 +218,7 @@ class DistributedCreateNodeCursor : public Cursor { if (const auto *node_info_properties = std::get_if(&node_info->properties)) { for (const auto &[key, value_expression] : *node_info_properties) { TypedValue val = value_expression->Accept(evaluator); - if (context.shard_request_manager->IsPrimaryKey(primary_label, key)) { + if (context.request_router->IsPrimaryKey(primary_label, key)) { rqst.primary_key.push_back(TypedValueToValue(val)); pk.push_back(TypedValueToValue(val)); } @@ -227,8 +227,8 @@ class DistributedCreateNodeCursor : public Cursor { auto property_map = evaluator.Visit(*std::get(node_info->properties)).ValueMap(); for (const auto &[key, value] : property_map) { auto key_str = std::string(key); - auto property_id = context.shard_request_manager->NameToProperty(key_str); - if (context.shard_request_manager->IsPrimaryKey(primary_label, property_id)) { + auto property_id = context.request_router->NameToProperty(key_str); + if (context.request_router->IsPrimaryKey(primary_label, property_id)) { rqst.primary_key.push_back(TypedValueToValue(value)); pk.push_back(TypedValueToValue(value)); } @@ -403,10 +403,10 @@ class DistributedScanAllAndFilterCursor : public Cursor { using VertexAccessor = accessors::VertexAccessor; - bool MakeRequest(ShardRequestManagerInterface &shard_manager, ExecutionContext &context) { + bool MakeRequest(RequestRouterInterface &request_router, ExecutionContext &context) { { SCOPED_REQUEST_WAIT_PROFILE; - current_batch = shard_manager.Request(request_state_); + current_batch = request_router.Request(request_state_); } current_vertex_it = current_batch.begin(); return !current_batch.empty(); @@ -415,7 +415,7 @@ class DistributedScanAllAndFilterCursor : public Cursor { bool Pull(Frame &frame, ExecutionContext &context) override { SCOPED_PROFILE_OP(op_name_); - auto &shard_manager = *context.shard_request_manager; + auto &request_router = *context.request_router; while (true) { if (MustAbort(context)) { throw HintedAbortError(); @@ -428,10 +428,11 @@ class DistributedScanAllAndFilterCursor : public Cursor { } } - request_state_.label = label_.has_value() ? std::make_optional(shard_manager.LabelToName(*label_)) : std::nullopt; + request_state_.label = + label_.has_value() ? std::make_optional(request_router.LabelToName(*label_)) : std::nullopt; if (current_vertex_it == current_batch.end() && - (request_state_.state == State::COMPLETED || !MakeRequest(shard_manager, context))) { + (request_state_.state == State::COMPLETED || !MakeRequest(request_router, context))) { ResetExecutionState(); continue; } @@ -713,7 +714,7 @@ bool Filter::FilterCursor::Pull(Frame &frame, ExecutionContext &context) { // Like all filters, newly set values should not affect filtering of old // nodes and edges. - ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.shard_request_manager, + ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router, storage::v3::View::OLD); while (input_cursor_->Pull(frame, context)) { if (EvaluateFilter(evaluator, self_.expression_)) return true; @@ -754,8 +755,8 @@ bool Produce::ProduceCursor::Pull(Frame &frame, ExecutionContext &context) { if (input_cursor_->Pull(frame, context)) { // Produce should always yield the latest results. - ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, - context.shard_request_manager, storage::v3::View::NEW); + ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router, + storage::v3::View::NEW); for (auto named_expr : self_.named_expressions_) named_expr->Accept(evaluator); return true; @@ -1156,8 +1157,8 @@ class AggregateCursor : public Cursor { * aggregation results, and not on the number of inputs. */ void ProcessAll(Frame *frame, ExecutionContext *context) { - ExpressionEvaluator evaluator(frame, context->symbol_table, context->evaluation_context, - context->shard_request_manager, storage::v3::View::NEW); + ExpressionEvaluator evaluator(frame, context->symbol_table, context->evaluation_context, context->request_router, + storage::v3::View::NEW); while (input_cursor_->Pull(*frame, *context)) { ProcessOne(*frame, &evaluator); } @@ -1377,8 +1378,8 @@ bool Skip::SkipCursor::Pull(Frame &frame, ExecutionContext &context) { // First successful pull from the input, evaluate the skip expression. // The skip expression doesn't contain identifiers so graph view // parameter is not important. - ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, - context.shard_request_manager, storage::v3::View::OLD); + ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router, + storage::v3::View::OLD); TypedValue to_skip = self_.expression_->Accept(evaluator); if (to_skip.type() != TypedValue::Type::Int) throw QueryRuntimeException("Number of elements to skip must be an integer."); @@ -1432,8 +1433,8 @@ bool Limit::LimitCursor::Pull(Frame &frame, ExecutionContext &context) { if (limit_ == -1) { // Limit expression doesn't contain identifiers so graph view is not // important. - ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, - context.shard_request_manager, storage::v3::View::OLD); + ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router, + storage::v3::View::OLD); TypedValue limit = self_.expression_->Accept(evaluator); if (limit.type() != TypedValue::Type::Int) throw QueryRuntimeException("Limit on number of returned elements must be an integer."); @@ -1488,8 +1489,8 @@ class OrderByCursor : public Cursor { SCOPED_PROFILE_OP("OrderBy"); if (!did_pull_all_) { - ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, - context.shard_request_manager, storage::v3::View::OLD); + ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router, + storage::v3::View::OLD); auto *mem = cache_.get_allocator().GetMemoryResource(); while (input_cursor_->Pull(frame, context)) { // collect the order_by elements @@ -1746,8 +1747,8 @@ class UnwindCursor : public Cursor { if (!input_cursor_->Pull(frame, context)) return false; // successful pull from input, initialize value and iterator - ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, - context.shard_request_manager, storage::v3::View::OLD); + ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router, + storage::v3::View::OLD); TypedValue input_value = self_.input_expression_->Accept(evaluator); if (input_value.type() != TypedValue::Type::List) throw QueryRuntimeException("Argument of UNWIND must be a list, but '{}' was provided.", input_value.type()); @@ -2258,7 +2259,7 @@ class LoadCsvCursor : public Cursor { Frame frame(0); SymbolTable symbol_table; auto evaluator = - ExpressionEvaluator(&frame, symbol_table, eval_context, context.shard_request_manager, storage::v3::View::OLD); + ExpressionEvaluator(&frame, symbol_table, eval_context, context.request_router, storage::v3::View::OLD); auto maybe_file = ToOptionalString(&evaluator, self_->file_); auto maybe_delim = ToOptionalString(&evaluator, self_->delimiter_); @@ -2295,8 +2296,8 @@ class ForeachCursor : public Cursor { return false; } - ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, - context.shard_request_manager, storage::v3::View::NEW); + ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router, + storage::v3::View::NEW); TypedValue expr_result = expression->Accept(evaluator); if (expr_result.IsNull()) { @@ -2372,11 +2373,11 @@ class DistributedCreateExpandCursor : public Cursor { if (!input_cursor_->Pull(frame, context)) { return false; } - auto &shard_manager = context.shard_request_manager; + auto &request_router = context.request_router; ResetExecutionState(); { SCOPED_REQUEST_WAIT_PROFILE; - shard_manager->Request(state_, ExpandCreationInfoToRequest(context, frame)); + request_router->Request(state_, ExpandCreationInfoToRequest(context, frame)); } return true; } @@ -2413,7 +2414,7 @@ class DistributedCreateExpandCursor : public Cursor { // handle parameter auto property_map = evaluator.Visit(*std::get(edge_info.properties)).ValueMap(); for (const auto &[property, value] : property_map) { - const auto property_id = context.shard_request_manager->NameToProperty(std::string(property)); + const auto property_id = context.request_router->NameToProperty(std::string(property)); request.properties.emplace_back(property_id, storage::v3::TypedValueToValue(value)); } } @@ -2428,7 +2429,7 @@ class DistributedCreateExpandCursor : public Cursor { const auto set_vertex = [&context](const auto &vertex, auto &vertex_id) { vertex_id.first = vertex.PrimaryLabel(); for (const auto &[key, val] : vertex.Properties()) { - if (context.shard_request_manager->IsPrimaryKey(vertex_id.first.id, key)) { + if (context.request_router->IsPrimaryKey(vertex_id.first.id, key)) { vertex_id.second.push_back(val); } } @@ -2508,11 +2509,11 @@ class DistributedExpandCursor : public Cursor { request.src_vertices.push_back(get_dst_vertex(edge, direction)); request.direction = (direction == EdgeAtom::Direction::IN) ? msgs::EdgeDirection::OUT : msgs::EdgeDirection::IN; ExecutionState request_state; - auto result_rows = context.shard_request_manager->Request(request_state, std::move(request)); + auto result_rows = context.request_router->Request(request_state, std::move(request)); MG_ASSERT(result_rows.size() == 1); auto &result_row = result_rows.front(); frame[self_.common_.node_symbol] = accessors::VertexAccessor( - msgs::Vertex{result_row.src_vertex}, result_row.src_vertex_properties, context.shard_request_manager); + msgs::Vertex{result_row.src_vertex}, result_row.src_vertex_properties, context.request_router); } bool InitEdges(Frame &frame, ExecutionContext &context) { @@ -2536,7 +2537,7 @@ class DistributedExpandCursor : public Cursor { ExecutionState request_state; auto result_rows = std::invoke([&context, &request_state, &request]() mutable { SCOPED_REQUEST_WAIT_PROFILE; - return context.shard_request_manager->Request(request_state, std::move(request)); + return context.request_router->Request(request_state, std::move(request)); }); MG_ASSERT(result_rows.size() == 1); auto &result_row = result_rows.front(); @@ -2559,14 +2560,14 @@ class DistributedExpandCursor : public Cursor { case EdgeAtom::Direction::IN: { for (auto &edge : edge_messages) { edge_accessors.emplace_back(msgs::Edge{std::move(edge.other_end), vertex.Id(), {}, {edge.gid}, edge.type}, - context.shard_request_manager); + context.request_router); } break; } case EdgeAtom::Direction::OUT: { for (auto &edge : edge_messages) { edge_accessors.emplace_back(msgs::Edge{vertex.Id(), std::move(edge.other_end), {}, {edge.gid}, edge.type}, - context.shard_request_manager); + context.request_router); } break; } diff --git a/src/query/v2/plan/pretty_print.cpp b/src/query/v2/plan/pretty_print.cpp index b7ab6da6e..bc30a3890 100644 --- a/src/query/v2/plan/pretty_print.cpp +++ b/src/query/v2/plan/pretty_print.cpp @@ -14,13 +14,13 @@ #include "query/v2/bindings/pretty_print.hpp" #include "query/v2/db_accessor.hpp" -#include "query/v2/shard_request_manager.hpp" +#include "query/v2/request_router.hpp" #include "utils/string.hpp" namespace memgraph::query::v2::plan { -PlanPrinter::PlanPrinter(const ShardRequestManagerInterface *request_manager, std::ostream *out) - : request_manager_(request_manager), out_(out) {} +PlanPrinter::PlanPrinter(const RequestRouterInterface *request_router, std::ostream *out) + : request_router_(request_router), out_(out) {} #define PRE_VISIT(TOp) \ bool PlanPrinter::PreVisit(TOp &) { \ @@ -34,7 +34,7 @@ bool PlanPrinter::PreVisit(CreateExpand &op) { WithPrintLn([&](auto &out) { out << "* CreateExpand (" << op.input_symbol_.name() << ")" << (op.edge_info_.direction == query::v2::EdgeAtom::Direction::IN ? "<-" : "-") << "[" - << op.edge_info_.symbol.name() << ":" << request_manager_->EdgeTypeToName(op.edge_info_.edge_type) << "]" + << op.edge_info_.symbol.name() << ":" << request_router_->EdgeTypeToName(op.edge_info_.edge_type) << "]" << (op.edge_info_.direction == query::v2::EdgeAtom::Direction::OUT ? "->" : "-") << "(" << op.node_info_.symbol.name() << ")"; }); @@ -54,7 +54,7 @@ bool PlanPrinter::PreVisit(query::v2::plan::ScanAll &op) { bool PlanPrinter::PreVisit(query::v2::plan::ScanAllByLabel &op) { WithPrintLn([&](auto &out) { out << "* ScanAllByLabel" - << " (" << op.output_symbol_.name() << " :" << request_manager_->LabelToName(op.label_) << ")"; + << " (" << op.output_symbol_.name() << " :" << request_router_->LabelToName(op.label_) << ")"; }); return true; } @@ -62,8 +62,8 @@ bool PlanPrinter::PreVisit(query::v2::plan::ScanAllByLabel &op) { bool PlanPrinter::PreVisit(query::v2::plan::ScanAllByLabelPropertyValue &op) { WithPrintLn([&](auto &out) { out << "* ScanAllByLabelPropertyValue" - << " (" << op.output_symbol_.name() << " :" << request_manager_->LabelToName(op.label_) << " {" - << request_manager_->PropertyToName(op.property_) << "})"; + << " (" << op.output_symbol_.name() << " :" << request_router_->LabelToName(op.label_) << " {" + << request_router_->PropertyToName(op.property_) << "})"; }); return true; } @@ -71,8 +71,8 @@ bool PlanPrinter::PreVisit(query::v2::plan::ScanAllByLabelPropertyValue &op) { bool PlanPrinter::PreVisit(query::v2::plan::ScanAllByLabelPropertyRange &op) { WithPrintLn([&](auto &out) { out << "* ScanAllByLabelPropertyRange" - << " (" << op.output_symbol_.name() << " :" << request_manager_->LabelToName(op.label_) << " {" - << request_manager_->PropertyToName(op.property_) << "})"; + << " (" << op.output_symbol_.name() << " :" << request_router_->LabelToName(op.label_) << " {" + << request_router_->PropertyToName(op.property_) << "})"; }); return true; } @@ -80,8 +80,8 @@ bool PlanPrinter::PreVisit(query::v2::plan::ScanAllByLabelPropertyRange &op) { bool PlanPrinter::PreVisit(query::v2::plan::ScanAllByLabelProperty &op) { WithPrintLn([&](auto &out) { out << "* ScanAllByLabelProperty" - << " (" << op.output_symbol_.name() << " :" << request_manager_->LabelToName(op.label_) << " {" - << request_manager_->PropertyToName(op.property_) << "})"; + << " (" << op.output_symbol_.name() << " :" << request_router_->LabelToName(op.label_) << " {" + << request_router_->PropertyToName(op.property_) << "})"; }); return true; } @@ -100,7 +100,7 @@ bool PlanPrinter::PreVisit(query::v2::plan::Expand &op) { << (op.common_.direction == query::v2::EdgeAtom::Direction::IN ? "<-" : "-") << "[" << op.common_.edge_symbol.name(); utils::PrintIterable(*out_, op.common_.edge_types, "|", [this](auto &stream, const auto &edge_type) { - stream << ":" << request_manager_->EdgeTypeToName(edge_type); + stream << ":" << request_router_->EdgeTypeToName(edge_type); }); *out_ << "]" << (op.common_.direction == query::v2::EdgeAtom::Direction::OUT ? "->" : "-") << "(" << op.common_.node_symbol.name() << ")"; @@ -129,7 +129,7 @@ bool PlanPrinter::PreVisit(query::v2::plan::ExpandVariable &op) { << (op.common_.direction == query::v2::EdgeAtom::Direction::IN ? "<-" : "-") << "[" << op.common_.edge_symbol.name(); utils::PrintIterable(*out_, op.common_.edge_types, "|", [this](auto &stream, const auto &edge_type) { - stream << ":" << request_manager_->EdgeTypeToName(edge_type); + stream << ":" << request_router_->EdgeTypeToName(edge_type); }); *out_ << "]" << (op.common_.direction == query::v2::EdgeAtom::Direction::OUT ? "->" : "-") << "(" << op.common_.node_symbol.name() << ")"; @@ -263,15 +263,14 @@ void PlanPrinter::Branch(query::v2::plan::LogicalOperator &op, const std::string --depth_; } -void PrettyPrint(const ShardRequestManagerInterface &request_manager, const LogicalOperator *plan_root, - std::ostream *out) { - PlanPrinter printer(&request_manager, out); +void PrettyPrint(const RequestRouterInterface &request_router, const LogicalOperator *plan_root, std::ostream *out) { + PlanPrinter printer(&request_router, out); // FIXME(mtomic): We should make visitors that take const arguments. const_cast(plan_root)->Accept(printer); } -nlohmann::json PlanToJson(const ShardRequestManagerInterface &request_manager, const LogicalOperator *plan_root) { - impl::PlanToJsonVisitor visitor(&request_manager); +nlohmann::json PlanToJson(const RequestRouterInterface &request_router, const LogicalOperator *plan_root) { + impl::PlanToJsonVisitor visitor(&request_router); // FIXME(mtomic): We should make visitors that take const arguments. const_cast(plan_root)->Accept(visitor); return visitor.output(); @@ -349,16 +348,16 @@ json ToJson(const utils::Bound &bound) { json ToJson(const Symbol &symbol) { return symbol.name(); } -json ToJson(storage::v3::EdgeTypeId edge_type, const ShardRequestManagerInterface &request_manager) { - return request_manager.EdgeTypeToName(edge_type); +json ToJson(storage::v3::EdgeTypeId edge_type, const RequestRouterInterface &request_router) { + return request_router.EdgeTypeToName(edge_type); } -json ToJson(storage::v3::LabelId label, const ShardRequestManagerInterface &request_manager) { - return request_manager.LabelToName(label); +json ToJson(storage::v3::LabelId label, const RequestRouterInterface &request_router) { + return request_router.LabelToName(label); } -json ToJson(storage::v3::PropertyId property, const ShardRequestManagerInterface &request_manager) { - return request_manager.PropertyToName(property); +json ToJson(storage::v3::PropertyId property, const RequestRouterInterface &request_router) { + return request_router.PropertyToName(property); } json ToJson(NamedExpression *nexpr) { @@ -369,29 +368,29 @@ json ToJson(NamedExpression *nexpr) { } json ToJson(const std::vector> &properties, - const ShardRequestManagerInterface &request_manager) { + const RequestRouterInterface &request_router) { json json; for (const auto &prop_pair : properties) { - json.emplace(ToJson(prop_pair.first, request_manager), ToJson(prop_pair.second)); + json.emplace(ToJson(prop_pair.first, request_router), ToJson(prop_pair.second)); } return json; } -json ToJson(const NodeCreationInfo &node_info, const ShardRequestManagerInterface &request_manager) { +json ToJson(const NodeCreationInfo &node_info, const RequestRouterInterface &request_router) { json self; self["symbol"] = ToJson(node_info.symbol); - self["labels"] = ToJson(node_info.labels, request_manager); + self["labels"] = ToJson(node_info.labels, request_router); const auto *props = std::get_if(&node_info.properties); - self["properties"] = ToJson(props ? *props : PropertiesMapList{}, request_manager); + self["properties"] = ToJson(props ? *props : PropertiesMapList{}, request_router); return self; } -json ToJson(const EdgeCreationInfo &edge_info, const ShardRequestManagerInterface &request_manager) { +json ToJson(const EdgeCreationInfo &edge_info, const RequestRouterInterface &request_router) { json self; self["symbol"] = ToJson(edge_info.symbol); const auto *props = std::get_if(&edge_info.properties); - self["properties"] = ToJson(props ? *props : PropertiesMapList{}, request_manager); - self["edge_type"] = ToJson(edge_info.edge_type, request_manager); + self["properties"] = ToJson(props ? *props : PropertiesMapList{}, request_router); + self["edge_type"] = ToJson(edge_info.edge_type, request_router); self["direction"] = ToString(edge_info.direction); return self; } @@ -433,7 +432,7 @@ bool PlanToJsonVisitor::PreVisit(ScanAll &op) { bool PlanToJsonVisitor::PreVisit(ScanAllByLabel &op) { json self; self["name"] = "ScanAllByLabel"; - self["label"] = ToJson(op.label_, *request_manager_); + self["label"] = ToJson(op.label_, *request_router_); self["output_symbol"] = ToJson(op.output_symbol_); op.input_->Accept(*this); @@ -446,8 +445,8 @@ bool PlanToJsonVisitor::PreVisit(ScanAllByLabel &op) { bool PlanToJsonVisitor::PreVisit(ScanAllByLabelPropertyRange &op) { json self; self["name"] = "ScanAllByLabelPropertyRange"; - self["label"] = ToJson(op.label_, *request_manager_); - self["property"] = ToJson(op.property_, *request_manager_); + self["label"] = ToJson(op.label_, *request_router_); + self["property"] = ToJson(op.property_, *request_router_); self["lower_bound"] = op.lower_bound_ ? ToJson(*op.lower_bound_) : json(); self["upper_bound"] = op.upper_bound_ ? ToJson(*op.upper_bound_) : json(); self["output_symbol"] = ToJson(op.output_symbol_); @@ -462,8 +461,8 @@ bool PlanToJsonVisitor::PreVisit(ScanAllByLabelPropertyRange &op) { bool PlanToJsonVisitor::PreVisit(ScanAllByLabelPropertyValue &op) { json self; self["name"] = "ScanAllByLabelPropertyValue"; - self["label"] = ToJson(op.label_, *request_manager_); - self["property"] = ToJson(op.property_, *request_manager_); + self["label"] = ToJson(op.label_, *request_router_); + self["property"] = ToJson(op.property_, *request_router_); self["expression"] = ToJson(op.expression_); self["output_symbol"] = ToJson(op.output_symbol_); @@ -477,8 +476,8 @@ bool PlanToJsonVisitor::PreVisit(ScanAllByLabelPropertyValue &op) { bool PlanToJsonVisitor::PreVisit(ScanAllByLabelProperty &op) { json self; self["name"] = "ScanAllByLabelProperty"; - self["label"] = ToJson(op.label_, *request_manager_); - self["property"] = ToJson(op.property_, *request_manager_); + self["label"] = ToJson(op.label_, *request_router_); + self["property"] = ToJson(op.property_, *request_router_); self["output_symbol"] = ToJson(op.output_symbol_); op.input_->Accept(*this); @@ -501,7 +500,7 @@ bool PlanToJsonVisitor::PreVisit(ScanAllById &op) { bool PlanToJsonVisitor::PreVisit(CreateNode &op) { json self; self["name"] = "CreateNode"; - self["node_info"] = ToJson(op.node_info_, *request_manager_); + self["node_info"] = ToJson(op.node_info_, *request_router_); op.input_->Accept(*this); self["input"] = PopOutput(); @@ -514,8 +513,8 @@ bool PlanToJsonVisitor::PreVisit(CreateExpand &op) { json self; self["name"] = "CreateExpand"; self["input_symbol"] = ToJson(op.input_symbol_); - self["node_info"] = ToJson(op.node_info_, *request_manager_); - self["edge_info"] = ToJson(op.edge_info_, *request_manager_); + self["node_info"] = ToJson(op.node_info_, *request_router_); + self["edge_info"] = ToJson(op.edge_info_, *request_router_); self["existing_node"] = op.existing_node_; op.input_->Accept(*this); @@ -531,7 +530,7 @@ bool PlanToJsonVisitor::PreVisit(Expand &op) { self["input_symbol"] = ToJson(op.input_symbol_); self["node_symbol"] = ToJson(op.common_.node_symbol); self["edge_symbol"] = ToJson(op.common_.edge_symbol); - self["edge_types"] = ToJson(op.common_.edge_types, *request_manager_); + self["edge_types"] = ToJson(op.common_.edge_types, *request_router_); self["direction"] = ToString(op.common_.direction); self["existing_node"] = op.common_.existing_node; @@ -548,7 +547,7 @@ bool PlanToJsonVisitor::PreVisit(ExpandVariable &op) { self["input_symbol"] = ToJson(op.input_symbol_); self["node_symbol"] = ToJson(op.common_.node_symbol); self["edge_symbol"] = ToJson(op.common_.edge_symbol); - self["edge_types"] = ToJson(op.common_.edge_types, *request_manager_); + self["edge_types"] = ToJson(op.common_.edge_types, *request_router_); self["direction"] = ToString(op.common_.direction); self["type"] = ToString(op.type_); self["is_reverse"] = op.is_reverse_; @@ -623,7 +622,7 @@ bool PlanToJsonVisitor::PreVisit(Delete &op) { bool PlanToJsonVisitor::PreVisit(SetProperty &op) { json self; self["name"] = "SetProperty"; - self["property"] = ToJson(op.property_, *request_manager_); + self["property"] = ToJson(op.property_, *request_router_); self["lhs"] = ToJson(op.lhs_); self["rhs"] = ToJson(op.rhs_); @@ -660,7 +659,7 @@ bool PlanToJsonVisitor::PreVisit(SetLabels &op) { json self; self["name"] = "SetLabels"; self["input_symbol"] = ToJson(op.input_symbol_); - self["labels"] = ToJson(op.labels_, *request_manager_); + self["labels"] = ToJson(op.labels_, *request_router_); op.input_->Accept(*this); self["input"] = PopOutput(); @@ -672,7 +671,7 @@ bool PlanToJsonVisitor::PreVisit(SetLabels &op) { bool PlanToJsonVisitor::PreVisit(RemoveProperty &op) { json self; self["name"] = "RemoveProperty"; - self["property"] = ToJson(op.property_, *request_manager_); + self["property"] = ToJson(op.property_, *request_router_); self["lhs"] = ToJson(op.lhs_); op.input_->Accept(*this); @@ -686,7 +685,7 @@ bool PlanToJsonVisitor::PreVisit(RemoveLabels &op) { json self; self["name"] = "RemoveLabels"; self["input_symbol"] = ToJson(op.input_symbol_); - self["labels"] = ToJson(op.labels_, *request_manager_); + self["labels"] = ToJson(op.labels_, *request_router_); op.input_->Accept(*this); self["input"] = PopOutput(); diff --git a/src/query/v2/plan/pretty_print.hpp b/src/query/v2/plan/pretty_print.hpp index 8485723a3..4094d7c81 100644 --- a/src/query/v2/plan/pretty_print.hpp +++ b/src/query/v2/plan/pretty_print.hpp @@ -18,7 +18,7 @@ #include "query/v2/frontend/ast/ast.hpp" #include "query/v2/plan/operator.hpp" -#include "query/v2/shard_request_manager.hpp" +#include "query/v2/request_router.hpp" namespace memgraph::query::v2 { @@ -27,20 +27,19 @@ namespace plan { class LogicalOperator; /// Pretty print a `LogicalOperator` plan to a `std::ostream`. -/// ShardRequestManager is needed for resolving label and property names. +/// RequestRouter is needed for resolving label and property names. /// Note that `plan_root` isn't modified, but we can't take it as a const /// because we don't have support for visiting a const LogicalOperator. -void PrettyPrint(const ShardRequestManagerInterface &request_manager, const LogicalOperator *plan_root, - std::ostream *out); +void PrettyPrint(const RequestRouterInterface &request_router, const LogicalOperator *plan_root, std::ostream *out); /// Overload of `PrettyPrint` which defaults the `std::ostream` to `std::cout`. -inline void PrettyPrint(const ShardRequestManagerInterface &request_manager, const LogicalOperator *plan_root) { - PrettyPrint(request_manager, plan_root, &std::cout); +inline void PrettyPrint(const RequestRouterInterface &request_router, const LogicalOperator *plan_root) { + PrettyPrint(request_router, plan_root, &std::cout); } /// Convert a `LogicalOperator` plan to a JSON representation. /// DbAccessor is needed for resolving label and property names. -nlohmann::json PlanToJson(const ShardRequestManagerInterface &request_manager, const LogicalOperator *plan_root); +nlohmann::json PlanToJson(const RequestRouterInterface &request_router, const LogicalOperator *plan_root); class PlanPrinter : public virtual HierarchicalLogicalOperatorVisitor { public: @@ -48,7 +47,7 @@ class PlanPrinter : public virtual HierarchicalLogicalOperatorVisitor { using HierarchicalLogicalOperatorVisitor::PreVisit; using HierarchicalLogicalOperatorVisitor::Visit; - PlanPrinter(const ShardRequestManagerInterface *request_manager, std::ostream *out); + PlanPrinter(const RequestRouterInterface *request_router, std::ostream *out); bool DefaultPreVisit() override; @@ -115,7 +114,7 @@ class PlanPrinter : public virtual HierarchicalLogicalOperatorVisitor { void Branch(LogicalOperator &op, const std::string &branch_name = ""); int64_t depth_{0}; - const ShardRequestManagerInterface *request_manager_{nullptr}; + const RequestRouterInterface *request_router_{nullptr}; std::ostream *out_{nullptr}; }; @@ -133,20 +132,20 @@ nlohmann::json ToJson(const utils::Bound &bound); nlohmann::json ToJson(const Symbol &symbol); -nlohmann::json ToJson(storage::v3::EdgeTypeId edge_type, const ShardRequestManagerInterface &request_manager); +nlohmann::json ToJson(storage::v3::EdgeTypeId edge_type, const RequestRouterInterface &request_router); -nlohmann::json ToJson(storage::v3::LabelId label, const ShardRequestManagerInterface &request_manager); +nlohmann::json ToJson(storage::v3::LabelId label, const RequestRouterInterface &request_router); -nlohmann::json ToJson(storage::v3::PropertyId property, const ShardRequestManagerInterface &request_manager); +nlohmann::json ToJson(storage::v3::PropertyId property, const RequestRouterInterface &request_router); nlohmann::json ToJson(NamedExpression *nexpr); nlohmann::json ToJson(const std::vector> &properties, - const ShardRequestManagerInterface &request_manager); + const RequestRouterInterface &request_router); -nlohmann::json ToJson(const NodeCreationInfo &node_info, const ShardRequestManagerInterface &request_manager); +nlohmann::json ToJson(const NodeCreationInfo &node_info, const RequestRouterInterface &request_router); -nlohmann::json ToJson(const EdgeCreationInfo &edge_info, const ShardRequestManagerInterface &request_manager); +nlohmann::json ToJson(const EdgeCreationInfo &edge_info, const RequestRouterInterface &request_router); nlohmann::json ToJson(const Aggregate::Element &elem); @@ -161,7 +160,7 @@ nlohmann::json ToJson(const std::vector &items, Args &&...args) { class PlanToJsonVisitor : public virtual HierarchicalLogicalOperatorVisitor { public: - explicit PlanToJsonVisitor(const ShardRequestManagerInterface *request_manager) : request_manager_(request_manager) {} + explicit PlanToJsonVisitor(const RequestRouterInterface *request_router) : request_router_(request_router) {} using HierarchicalLogicalOperatorVisitor::PostVisit; using HierarchicalLogicalOperatorVisitor::PreVisit; @@ -217,7 +216,7 @@ class PlanToJsonVisitor : public virtual HierarchicalLogicalOperatorVisitor { protected: nlohmann::json output_; - const ShardRequestManagerInterface *request_manager_; + const RequestRouterInterface *request_router_; nlohmann::json PopOutput() { nlohmann::json tmp; diff --git a/src/query/v2/plan/rule_based_planner.hpp b/src/query/v2/plan/rule_based_planner.hpp index 884693c2f..2b6857aea 100644 --- a/src/query/v2/plan/rule_based_planner.hpp +++ b/src/query/v2/plan/rule_based_planner.hpp @@ -272,7 +272,7 @@ class RuleBasedPlanner { PropertiesMapList vector_props; vector_props.reserve(node_properties->size()); for (const auto &kv : *node_properties) { - // TODO(kostasrim) GetProperty should be implemented in terms of ShardRequestManager NameToProperty + // TODO(kostasrim) GetProperty should be implemented in terms of RequestRouter NameToProperty vector_props.push_back({GetProperty(kv.first), kv.second}); } return std::move(vector_props); diff --git a/src/query/v2/plan/vertex_count_cache.hpp b/src/query/v2/plan/vertex_count_cache.hpp index 47a10ba3e..e68ce1220 100644 --- a/src/query/v2/plan/vertex_count_cache.hpp +++ b/src/query/v2/plan/vertex_count_cache.hpp @@ -15,7 +15,7 @@ #include #include "query/v2/bindings/typed_value.hpp" -#include "query/v2/shard_request_manager.hpp" +#include "query/v2/request_router.hpp" #include "storage/v3/conversions.hpp" #include "storage/v3/id_types.hpp" #include "storage/v3/property_value.hpp" @@ -29,11 +29,11 @@ namespace memgraph::query::v2::plan { template class VertexCountCache { public: - explicit VertexCountCache(TDbAccessor *shard_request_manager) : shard_request_manager_{shard_request_manager} {} + explicit VertexCountCache(TDbAccessor *request_router) : request_router_{request_router} {} - auto NameToLabel(const std::string &name) { return shard_request_manager_->NameToLabel(name); } - auto NameToProperty(const std::string &name) { return shard_request_manager_->NameToProperty(name); } - auto NameToEdgeType(const std::string &name) { return shard_request_manager_->NameToEdgeType(name); } + auto NameToLabel(const std::string &name) { return request_router_->NameToLabel(name); } + auto NameToProperty(const std::string &name) { return request_router_->NameToProperty(name); } + auto NameToEdgeType(const std::string &name) { return request_router_->NameToEdgeType(name); } int64_t VerticesCount() { return 1; } @@ -53,11 +53,11 @@ class VertexCountCache { } // For now return true if label is primary label - bool LabelIndexExists(storage::v3::LabelId label) { return shard_request_manager_->IsPrimaryLabel(label); } + bool LabelIndexExists(storage::v3::LabelId label) { return request_router_->IsPrimaryLabel(label); } bool LabelPropertyIndexExists(storage::v3::LabelId /*label*/, storage::v3::PropertyId /*property*/) { return false; } - ShardRequestManagerInterface *shard_request_manager_; + RequestRouterInterface *request_router_; }; template diff --git a/src/query/v2/shard_request_manager.hpp b/src/query/v2/request_router.hpp similarity index 96% rename from src/query/v2/shard_request_manager.hpp rename to src/query/v2/request_router.hpp index 581dfbcab..fbf8c514c 100644 --- a/src/query/v2/shard_request_manager.hpp +++ b/src/query/v2/request_router.hpp @@ -83,10 +83,10 @@ struct ExecutionState { // CompoundKey is optional because some operators require to iterate over all the available keys // of a shard. One example is ScanAll, where we only require the field label. std::optional key; - // Transaction id to be filled by the ShardRequestManager implementation + // Transaction id to be filled by the RequestRouter implementation coordinator::Hlc transaction_id; - // Initialized by ShardRequestManager implementation. This vector is filled with the shards that - // the ShardRequestManager impl will send requests to. When a request to a shard exhausts it, meaning that + // Initialized by RequestRouter implementation. This vector is filled with the shards that + // the RequestRouter impl will send requests to. When a request to a shard exhausts it, meaning that // it pulled all the requested data from the given Shard, it will be removed from the Vector. When the Vector becomes // empty, it means that all of the requests have completed succefully. // TODO(gvolfing) @@ -101,16 +101,16 @@ struct ExecutionState { State state = INITIALIZING; }; -class ShardRequestManagerInterface { +class RequestRouterInterface { public: using VertexAccessor = query::v2::accessors::VertexAccessor; - ShardRequestManagerInterface() = default; - ShardRequestManagerInterface(const ShardRequestManagerInterface &) = delete; - ShardRequestManagerInterface(ShardRequestManagerInterface &&) = delete; - ShardRequestManagerInterface &operator=(const ShardRequestManagerInterface &) = delete; - ShardRequestManagerInterface &&operator=(ShardRequestManagerInterface &&) = delete; + RequestRouterInterface() = default; + RequestRouterInterface(const RequestRouterInterface &) = delete; + RequestRouterInterface(RequestRouterInterface &&) = delete; + RequestRouterInterface &operator=(const RequestRouterInterface &) = delete; + RequestRouterInterface &&operator=(RequestRouterInterface &&) = delete; - virtual ~ShardRequestManagerInterface() = default; + virtual ~RequestRouterInterface() = default; virtual void StartTransaction() = 0; virtual void Commit() = 0; @@ -137,7 +137,7 @@ class ShardRequestManagerInterface { // TODO(kostasrim)rename this class template template -class ShardRequestManager : public ShardRequestManagerInterface { +class RequestRouter : public RequestRouterInterface { public: using StorageClient = coordinator::RsmClient; @@ -148,15 +148,14 @@ class ShardRequestManager : public ShardRequestManagerInterface { using ShardMap = coordinator::ShardMap; using CompoundKey = coordinator::PrimaryKey; using VertexAccessor = query::v2::accessors::VertexAccessor; - ShardRequestManager(CoordinatorClient coord, io::Io &&io) - : coord_cli_(std::move(coord)), io_(std::move(io)) {} + RequestRouter(CoordinatorClient coord, io::Io &&io) : coord_cli_(std::move(coord)), io_(std::move(io)) {} - ShardRequestManager(const ShardRequestManager &) = delete; - ShardRequestManager(ShardRequestManager &&) = delete; - ShardRequestManager &operator=(const ShardRequestManager &) = delete; - ShardRequestManager &operator=(ShardRequestManager &&) = delete; + RequestRouter(const RequestRouter &) = delete; + RequestRouter(RequestRouter &&) = delete; + RequestRouter &operator=(const RequestRouter &) = delete; + RequestRouter &operator=(RequestRouter &&) = delete; - ~ShardRequestManager() override {} + ~RequestRouter() override {} void StartTransaction() override { coordinator::HlcRequest req{.last_shard_map_version = shards_map_.GetHlc()}; diff --git a/tests/simulation/shard_request_manager.cpp b/tests/simulation/request_router.cpp similarity index 90% rename from tests/simulation/shard_request_manager.cpp rename to tests/simulation/request_router.cpp index 9db18659e..8187f138b 100644 --- a/tests/simulation/shard_request_manager.cpp +++ b/tests/simulation/request_router.cpp @@ -31,8 +31,8 @@ #include "io/simulator/simulator_transport.hpp" #include "query/v2/accessors.hpp" #include "query/v2/conversions.hpp" +#include "query/v2/request_router.hpp" #include "query/v2/requests.hpp" -#include "query/v2/shard_request_manager.hpp" #include "storage/v3/property_value.hpp" #include "utils/result.hpp" @@ -151,10 +151,10 @@ void RunStorageRaft(Raft state{.label = "test_label"}; - auto result = io.Request(state); + auto result = request_router.Request(state); MG_ASSERT(result.size() == 2); { auto prop = result[0].GetProperty(msgs::PropertyId::FromUint(0)); @@ -163,7 +163,7 @@ void TestScanVertices(query::v2::ShardRequestManagerInterface &io) { MG_ASSERT(prop.int_v == 444); } - result = io.Request(state); + result = request_router.Request(state); { MG_ASSERT(result.size() == 1); auto prop = result[0].GetProperty(msgs::PropertyId::FromUint(0)); @@ -171,11 +171,11 @@ void TestScanVertices(query::v2::ShardRequestManagerInterface &io) { } } -void TestCreateVertices(query::v2::ShardRequestManagerInterface &io) { +void TestCreateVertices(query::v2::RequestRouterInterface &request_router) { using PropVal = msgs::Value; msgs::ExecutionState state; std::vector new_vertices; - auto label_id = io.NameToLabel("test_label"); + auto label_id = request_router.NameToLabel("test_label"); msgs::NewVertex a1{.primary_key = {PropVal(int64_t(1)), PropVal(int64_t(0))}}; a1.label_ids.push_back({label_id}); msgs::NewVertex a2{.primary_key = {PropVal(int64_t(13)), PropVal(int64_t(13))}}; @@ -183,17 +183,17 @@ void TestCreateVertices(query::v2::ShardRequestManagerInterface &io) { new_vertices.push_back(std::move(a1)); new_vertices.push_back(std::move(a2)); - auto result = io.Request(state, std::move(new_vertices)); + auto result = request_router.Request(state, std::move(new_vertices)); MG_ASSERT(result.size() == 2); } -void TestCreateExpand(query::v2::ShardRequestManagerInterface &io) { +void TestCreateExpand(query::v2::RequestRouterInterface &request_router) { using PropVal = msgs::Value; msgs::ExecutionState state; std::vector new_expands; - const auto edge_type_id = io.NameToEdgeType("edge_type"); - const auto label = msgs::Label{io.NameToLabel("test_label")}; + const auto edge_type_id = request_router.NameToEdgeType("edge_type"); + const auto label = msgs::Label{request_router.NameToLabel("test_label")}; const msgs::VertexId vertex_id_1{label, {PropVal(int64_t(0)), PropVal(int64_t(0))}}; const msgs::VertexId vertex_id_2{label, {PropVal(int64_t(13)), PropVal(int64_t(13))}}; msgs::NewExpand expand_1{ @@ -203,26 +203,26 @@ void TestCreateExpand(query::v2::ShardRequestManagerInterface &io) { new_expands.push_back(std::move(expand_1)); new_expands.push_back(std::move(expand_2)); - auto responses = io.Request(state, std::move(new_expands)); + auto responses = request_router.Request(state, std::move(new_expands)); MG_ASSERT(responses.size() == 2); MG_ASSERT(responses[0].success); MG_ASSERT(responses[1].success); } -void TestExpandOne(query::v2::ShardRequestManagerInterface &shard_request_manager) { +void TestExpandOne(query::v2::RequestRouterInterface &request_router) { msgs::ExecutionState state{}; msgs::ExpandOneRequest request; - const auto edge_type_id = shard_request_manager.NameToEdgeType("edge_type"); - const auto label = msgs::Label{shard_request_manager.NameToLabel("test_label")}; + const auto edge_type_id = request_router.NameToEdgeType("edge_type"); + const auto label = msgs::Label{request_router.NameToLabel("test_label")}; request.src_vertices.push_back(msgs::VertexId{label, {msgs::Value(int64_t(0)), msgs::Value(int64_t(0))}}); request.edge_types.push_back(msgs::EdgeType{edge_type_id}); request.direction = msgs::EdgeDirection::BOTH; - auto result_rows = shard_request_manager.Request(state, std::move(request)); + auto result_rows = request_router.Request(state, std::move(request)); MG_ASSERT(result_rows.size() == 2); } -template -void TestAggregate(ShardRequestManager &io) {} +template +void TestAggregate(RequestRouter &request_router) {} void DoTest() { SimulatorConfig config{ @@ -337,12 +337,12 @@ void DoTest() { // also get the current shard map CoordinatorClient coordinator_client(cli_io, c_addrs[0], c_addrs); - query::v2::ShardRequestManager io(std::move(coordinator_client), std::move(cli_io)); + query::v2::RequestRouter request_router(std::move(coordinator_client), std::move(cli_io)); - io.StartTransaction(); - TestScanVertices(io); - TestCreateVertices(io); - TestCreateExpand(io); + request_router.StartTransaction(); + TestScanVertices(request_router); + TestCreateVertices(request_router); + TestCreateExpand(request_router); simulator.ShutDown(); diff --git a/tests/simulation/test_cluster.hpp b/tests/simulation/test_cluster.hpp index fd62b99f9..99f617cda 100644 --- a/tests/simulation/test_cluster.hpp +++ b/tests/simulation/test_cluster.hpp @@ -30,8 +30,8 @@ #include "io/simulator/simulator_transport.hpp" #include "machine_manager/machine_config.hpp" #include "machine_manager/machine_manager.hpp" +#include "query/v2/request_router.hpp" #include "query/v2/requests.hpp" -#include "query/v2/shard_request_manager.hpp" #include "testing_constants.hpp" #include "utils/print_helpers.hpp" #include "utils/variant_helpers.hpp" @@ -151,8 +151,8 @@ ShardMap TestShardMap(int n_splits, int replication_factor) { return sm; } -void ExecuteOp(query::v2::ShardRequestManager &shard_request_manager, - std::set &correctness_model, CreateVertex create_vertex) { +void ExecuteOp(query::v2::RequestRouter &request_router, std::set &correctness_model, + CreateVertex create_vertex) { const auto key1 = memgraph::storage::v3::PropertyValue(create_vertex.first); const auto key2 = memgraph::storage::v3::PropertyValue(create_vertex.second); @@ -166,7 +166,7 @@ void ExecuteOp(query::v2::ShardRequestManager &shard_request query::v2::ExecutionState state; - auto label_id = shard_request_manager.NameToLabel("test_label"); + auto label_id = request_router.NameToLabel("test_label"); msgs::NewVertex nv{.primary_key = primary_key}; nv.label_ids.push_back({label_id}); @@ -174,7 +174,7 @@ void ExecuteOp(query::v2::ShardRequestManager &shard_request std::vector new_vertices; new_vertices.push_back(std::move(nv)); - auto result = shard_request_manager.Request(state, std::move(new_vertices)); + auto result = request_router.Request(state, std::move(new_vertices)); RC_ASSERT(result.size() == 1); RC_ASSERT(!result[0].error.has_value()); @@ -182,11 +182,11 @@ void ExecuteOp(query::v2::ShardRequestManager &shard_request correctness_model.emplace(std::make_pair(create_vertex.first, create_vertex.second)); } -void ExecuteOp(query::v2::ShardRequestManager &shard_request_manager, - std::set &correctness_model, ScanAll scan_all) { +void ExecuteOp(query::v2::RequestRouter &request_router, std::set &correctness_model, + ScanAll scan_all) { query::v2::ExecutionState request{.label = "test_label"}; - auto results = shard_request_manager.Request(request); + auto results = request_router.Request(request); RC_ASSERT(results.size() == correctness_model.size()); @@ -247,15 +247,14 @@ std::pair RunClusterSimulation(const CoordinatorClient coordinator_client(cli_io, coordinator_address, {coordinator_address}); WaitForShardsToInitialize(coordinator_client); - query::v2::ShardRequestManager shard_request_manager(std::move(coordinator_client), - std::move(cli_io)); + query::v2::RequestRouter request_router(std::move(coordinator_client), std::move(cli_io)); - shard_request_manager.StartTransaction(); + request_router.StartTransaction(); auto correctness_model = std::set{}; for (const Op &op : ops) { - std::visit([&](auto &o) { ExecuteOp(shard_request_manager, correctness_model, o); }, op.inner); + std::visit([&](auto &o) { ExecuteOp(request_router, correctness_model, o); }, op.inner); } // We have now completed our workload without failing any assertions, so we can diff --git a/tests/unit/high_density_shard_create_scan.cpp b/tests/unit/high_density_shard_create_scan.cpp index e586a3556..22af9c702 100644 --- a/tests/unit/high_density_shard_create_scan.cpp +++ b/tests/unit/high_density_shard_create_scan.cpp @@ -29,8 +29,8 @@ #include "io/simulator/simulator_transport.hpp" #include "machine_manager/machine_config.hpp" #include "machine_manager/machine_manager.hpp" +#include "query/v2/request_router.hpp" #include "query/v2/requests.hpp" -#include "query/v2/shard_request_manager.hpp" #include "utils/variant_helpers.hpp" namespace memgraph::tests::simulation { @@ -161,8 +161,8 @@ ShardMap TestShardMap(int shards, int replication_factor, int gap_between_shards return sm; } -void ExecuteOp(query::v2::ShardRequestManager &shard_request_manager, - std::set &correctness_model, CreateVertex create_vertex) { +void ExecuteOp(query::v2::RequestRouter &request_router, std::set &correctness_model, + CreateVertex create_vertex) { const auto key1 = memgraph::storage::v3::PropertyValue(create_vertex.first); const auto key2 = memgraph::storage::v3::PropertyValue(create_vertex.second); @@ -176,7 +176,7 @@ void ExecuteOp(query::v2::ShardRequestManager &shard_request_man query::v2::ExecutionState state; - auto label_id = shard_request_manager.NameToLabel("test_label"); + auto label_id = request_router.NameToLabel("test_label"); msgs::NewVertex nv{.primary_key = primary_key}; nv.label_ids.push_back({label_id}); @@ -184,7 +184,7 @@ void ExecuteOp(query::v2::ShardRequestManager &shard_request_man std::vector new_vertices; new_vertices.push_back(std::move(nv)); - auto result = shard_request_manager.Request(state, std::move(new_vertices)); + auto result = request_router.Request(state, std::move(new_vertices)); MG_ASSERT(result.size() == 1); MG_ASSERT(!result[0].error.has_value()); @@ -192,11 +192,11 @@ void ExecuteOp(query::v2::ShardRequestManager &shard_request_man correctness_model.emplace(std::make_pair(create_vertex.first, create_vertex.second)); } -void ExecuteOp(query::v2::ShardRequestManager &shard_request_manager, - std::set &correctness_model, ScanAll scan_all) { +void ExecuteOp(query::v2::RequestRouter &request_router, std::set &correctness_model, + ScanAll scan_all) { query::v2::ExecutionState request{.label = "test_label"}; - auto results = shard_request_manager.Request(request); + auto results = request_router.Request(request); MG_ASSERT(results.size() == correctness_model.size()); @@ -245,23 +245,22 @@ void RunWorkload(int shards, int replication_factor, int create_ops, int scan_op WaitForShardsToInitialize(coordinator_client); auto time_after_shard_stabilization = cli_io_2.Now(); - query::v2::ShardRequestManager shard_request_manager(std::move(coordinator_client), - std::move(cli_io)); + query::v2::RequestRouter request_router(std::move(coordinator_client), std::move(cli_io)); - shard_request_manager.StartTransaction(); + request_router.StartTransaction(); auto correctness_model = std::set{}; auto time_before_creates = cli_io_2.Now(); for (int i = 0; i < create_ops; i++) { - ExecuteOp(shard_request_manager, correctness_model, CreateVertex{.first = i, .second = i}); + ExecuteOp(request_router, correctness_model, CreateVertex{.first = i, .second = i}); } auto time_after_creates = cli_io_2.Now(); for (int i = 0; i < scan_ops; i++) { - ExecuteOp(shard_request_manager, correctness_model, ScanAll{}); + ExecuteOp(request_router, correctness_model, ScanAll{}); } auto time_after_scan = cli_io_2.Now(); diff --git a/tests/unit/machine_manager.cpp b/tests/unit/machine_manager.cpp index 834523fee..0b081e5a1 100644 --- a/tests/unit/machine_manager.cpp +++ b/tests/unit/machine_manager.cpp @@ -27,7 +27,7 @@ #include #include #include "io/rsm/rsm_client.hpp" -#include "query/v2/shard_request_manager.hpp" +#include "query/v2/request_router.hpp" #include "storage/v3/id_types.hpp" #include "storage/v3/schemas.hpp" @@ -109,19 +109,19 @@ ShardMap TestShardMap() { return sm; } -template -void TestScanAll(ShardRequestManager &shard_request_manager) { +template +void TestScanAll(RequestRouter &request_router) { query::v2::ExecutionState state{.label = kLabelName}; - auto result = shard_request_manager.Request(state); + auto result = request_router.Request(state); EXPECT_EQ(result.size(), 2); } -void TestCreateVertices(query::v2::ShardRequestManagerInterface &shard_request_manager) { +void TestCreateVertices(query::v2::RequestRouterInterface &request_router) { using PropVal = msgs::Value; query::v2::ExecutionState state; std::vector new_vertices; - auto label_id = shard_request_manager.NameToLabel(kLabelName); + auto label_id = request_router.NameToLabel(kLabelName); msgs::NewVertex a1{.primary_key = {PropVal(int64_t(0)), PropVal(int64_t(0))}}; a1.label_ids.push_back({label_id}); msgs::NewVertex a2{.primary_key = {PropVal(int64_t(13)), PropVal(int64_t(13))}}; @@ -129,18 +129,18 @@ void TestCreateVertices(query::v2::ShardRequestManagerInterface &shard_request_m new_vertices.push_back(std::move(a1)); new_vertices.push_back(std::move(a2)); - auto result = shard_request_manager.Request(state, std::move(new_vertices)); + auto result = request_router.Request(state, std::move(new_vertices)); EXPECT_EQ(result.size(), 1); EXPECT_FALSE(result[0].error.has_value()) << result[0].error->message; } -void TestCreateExpand(query::v2::ShardRequestManagerInterface &shard_request_manager) { +void TestCreateExpand(query::v2::RequestRouterInterface &request_router) { using PropVal = msgs::Value; query::v2::ExecutionState state; std::vector new_expands; - const auto edge_type_id = shard_request_manager.NameToEdgeType("edge_type"); - const auto label = msgs::Label{shard_request_manager.NameToLabel("test_label")}; + const auto edge_type_id = request_router.NameToEdgeType("edge_type"); + const auto label = msgs::Label{request_router.NameToLabel("test_label")}; const msgs::VertexId vertex_id_1{label, {PropVal(int64_t(0)), PropVal(int64_t(0))}}; const msgs::VertexId vertex_id_2{label, {PropVal(int64_t(13)), PropVal(int64_t(13))}}; msgs::NewExpand expand_1{ @@ -150,27 +150,27 @@ void TestCreateExpand(query::v2::ShardRequestManagerInterface &shard_request_man new_expands.push_back(std::move(expand_1)); new_expands.push_back(std::move(expand_2)); - auto responses = shard_request_manager.Request(state, std::move(new_expands)); + auto responses = request_router.Request(state, std::move(new_expands)); MG_ASSERT(responses.size() == 1); MG_ASSERT(!responses[0].error.has_value()); } -void TestExpandOne(query::v2::ShardRequestManagerInterface &shard_request_manager) { +void TestExpandOne(query::v2::RequestRouterInterface &request_router) { query::v2::ExecutionState state{}; msgs::ExpandOneRequest request; - const auto edge_type_id = shard_request_manager.NameToEdgeType("edge_type"); - const auto label = msgs::Label{shard_request_manager.NameToLabel("test_label")}; + const auto edge_type_id = request_router.NameToEdgeType("edge_type"); + const auto label = msgs::Label{request_router.NameToLabel("test_label")}; request.src_vertices.push_back(msgs::VertexId{label, {msgs::Value(int64_t(0)), msgs::Value(int64_t(0))}}); request.edge_types.push_back(msgs::EdgeType{edge_type_id}); request.direction = msgs::EdgeDirection::BOTH; - auto result_rows = shard_request_manager.Request(state, std::move(request)); + auto result_rows = request_router.Request(state, std::move(request)); MG_ASSERT(result_rows.size() == 1); MG_ASSERT(result_rows[0].in_edges_with_all_properties.size() == 1); MG_ASSERT(result_rows[0].out_edges_with_all_properties.size() == 1); } -template -void TestAggregate(ShardRequestManager &shard_request_manager) {} +template +void TestAggregate(RequestRouter &request_router) {} MachineManager MkMm(LocalSystem &local_system, std::vector
coordinator_addresses, Address addr, ShardMap shard_map) { @@ -226,14 +226,13 @@ TEST(MachineManager, BasicFunctionality) { CoordinatorClient coordinator_client(cli_io, coordinator_address, {coordinator_address}); - query::v2::ShardRequestManager shard_request_manager(std::move(coordinator_client), - std::move(cli_io)); + query::v2::RequestRouter request_router(std::move(coordinator_client), std::move(cli_io)); - shard_request_manager.StartTransaction(); - TestCreateVertices(shard_request_manager); - TestScanAll(shard_request_manager); - TestCreateExpand(shard_request_manager); - TestExpandOne(shard_request_manager); + request_router.StartTransaction(); + TestCreateVertices(request_router); + TestScanAll(request_router); + TestCreateExpand(request_router); + TestExpandOne(request_router); local_system.ShutDown(); }; diff --git a/tests/unit/query_v2_expression_evaluator.cpp b/tests/unit/query_v2_expression_evaluator.cpp index 1fb4cf6dd..112ecd29e 100644 --- a/tests/unit/query_v2_expression_evaluator.cpp +++ b/tests/unit/query_v2_expression_evaluator.cpp @@ -27,8 +27,8 @@ #include "query/v2/bindings/frame.hpp" #include "query/v2/context.hpp" #include "query/v2/frontend/ast/ast.hpp" +#include "query/v2/request_router.hpp" #include "query/v2/requests.hpp" -#include "query/v2/shard_request_manager.hpp" #include "query_v2_query_common.hpp" #include "storage/v3/property_value.hpp" #include "storage/v3/shard.hpp" @@ -65,10 +65,10 @@ using memgraph::functions::FunctionRuntimeException; namespace memgraph::query::v2::tests { -class MockedShardRequestManager : public ShardRequestManagerInterface { +class MockedRequestRouter : public RequestRouterInterface { public: using VertexAccessor = accessors::VertexAccessor; - explicit MockedShardRequestManager(ShardMap shard_map) : shards_map_(std::move(shard_map)) { SetUpNameIdMappers(); } + explicit MockedRequestRouter(ShardMap shard_map) : shards_map_(std::move(shard_map)) { SetUpNameIdMappers(); } memgraph::storage::v3::EdgeTypeId NameToEdgeType(const std::string &name) const override { return shards_map_.GetEdgeTypeId(name).value(); } @@ -215,9 +215,8 @@ class ExpressionEvaluatorTest : public ::testing::Test { SymbolTable symbol_table; Frame frame{128}; - std::unique_ptr shard_manager = - std::make_unique(CreateDummyShardmap()); - ExpressionEvaluator eval{&frame, symbol_table, ctx, shard_manager.get(), memgraph::storage::v3::View::OLD}; + std::unique_ptr request_router = std::make_unique(CreateDummyShardmap()); + ExpressionEvaluator eval{&frame, symbol_table, ctx, request_router.get(), memgraph::storage::v3::View::OLD}; Identifier *CreateIdentifierWithValue(std::string name, const TypedValue &value) { auto id = storage.Create(name, true); @@ -229,8 +228,8 @@ class ExpressionEvaluatorTest : public ::testing::Test { template auto Eval(TExpression *expr) { - ctx.properties = NamesToProperties(storage.properties_, shard_manager.get()); - ctx.labels = NamesToLabels(storage.labels_, shard_manager.get()); + ctx.properties = NamesToProperties(storage.properties_, request_router.get()); + ctx.labels = NamesToLabels(storage.labels_, request_router.get()); auto value = expr->Accept(eval); EXPECT_EQ(value.GetMemoryResource(), &mem) << "ExpressionEvaluator must use the MemoryResource from " "EvaluationContext for allocations!"; @@ -544,25 +543,25 @@ using VertexId = memgraph::msgs::VertexId; using Label = memgraph::msgs::Label; accessors::VertexAccessor CreateVertex(std::vector> props, - const ShardRequestManagerInterface *manager, Vertex v = {}) { - return {std::move(v), std::move(props), manager}; + const RequestRouterInterface *request_router, Vertex v = {}) { + return {std::move(v), std::move(props), request_router}; } accessors::EdgeAccessor CreateEdge(std::vector> props, - const ShardRequestManagerInterface *manager, EdgeId edge_id = {}, VertexId src = {}, + const RequestRouterInterface *request_router, EdgeId edge_id = {}, VertexId src = {}, VertexId dst = {}) { auto edge = Edge{.src = std::move(src), .dst = std::move(dst), .properties = std::move(props), .id = edge_id, - .type = EdgeType{manager->NameToEdgeType("edge_type")}}; - return accessors::EdgeAccessor{std::move(edge), manager}; + .type = EdgeType{request_router->NameToEdgeType("edge_type")}}; + return accessors::EdgeAccessor{std::move(edge), request_router}; } TEST_F(ExpressionEvaluatorTest, VertexAndEdgeIndexing) { - auto prop = shard_manager->NameToProperty("prop"); - auto vertex = CreateVertex({{prop, Value(static_cast(42))}}, shard_manager.get(), {}); - auto edge = CreateEdge({{prop, Value(static_cast(43))}}, shard_manager.get(), {}, {}, {}); + auto prop = request_router->NameToProperty("prop"); + auto vertex = CreateVertex({{prop, Value(static_cast(42))}}, request_router.get(), {}); + auto edge = CreateEdge({{prop, Value(static_cast(43))}}, request_router.get(), {}, {}, {}); auto *vertex_id = CreateIdentifierWithValue("v1", TypedValue(vertex)); auto *edge_id = CreateIdentifierWithValue("e11", TypedValue(edge)); @@ -741,9 +740,9 @@ TEST_F(ExpressionEvaluatorTest, IsNullOperator) { } TEST_F(ExpressionEvaluatorTest, LabelsTest) { - Label label{shard_manager->NameToLabel("label1")}; + Label label{request_router->NameToLabel("label1")}; Vertex vertex = {{}, {label}}; - auto v1 = CreateVertex({}, shard_manager.get(), vertex); + auto v1 = CreateVertex({}, request_router.get(), vertex); auto *identifier = storage.Create("n"); auto node_symbol = symbol_table.CreateSymbol("n", true); identifier->MapTo(node_symbol); @@ -1121,9 +1120,9 @@ TEST_F(ExpressionEvaluatorTest, RegexMatch) { class ExpressionEvaluatorPropertyLookup : public ExpressionEvaluatorTest { protected: std::pair prop_age = - std::make_pair("age", shard_manager->NameToProperty("age")); + std::make_pair("age", request_router->NameToProperty("age")); std::pair prop_height = - std::make_pair("height", shard_manager->NameToProperty("height")); + std::make_pair("height", request_router->NameToProperty("height")); Identifier *identifier = storage.Create("element"); Symbol symbol = symbol_table.CreateSymbol("element", true); @@ -1191,10 +1190,10 @@ static TypedValue MakeTypedValueList(TArgs &&...args) { TEST_F(FunctionTest, EndNode) { ASSERT_THROW(EvaluateFunction("ENDNODE"), FunctionRuntimeException); ASSERT_TRUE(EvaluateFunction("ENDNODE", TypedValue()).IsNull()); - Label l{shard_manager->NameToLabel("label1")}; + Label l{request_router->NameToLabel("label1")}; EdgeId e_id{10}; VertexId dst{l, {msgs::Value(static_cast(2))}}; - auto e = CreateEdge({}, shard_manager.get(), e_id, {}, dst); + auto e = CreateEdge({}, request_router.get(), e_id, {}, dst); const auto res = EvaluateFunction("ENDNODE", e).ValueVertex().Id(); ASSERT_EQ(res, dst); ASSERT_THROW(EvaluateFunction("ENDNODE", 2), FunctionRuntimeException); @@ -1213,12 +1212,12 @@ TEST_F(FunctionTest, Head) { TEST_F(FunctionTest, Properties) { ASSERT_THROW(EvaluateFunction("PROPERTIES"), FunctionRuntimeException); ASSERT_TRUE(EvaluateFunction("PROPERTIES", TypedValue()).IsNull()); - const auto height = shard_manager->NameToProperty("height"); - const auto age = shard_manager->NameToProperty("age"); + const auto height = request_router->NameToProperty("height"); + const auto age = request_router->NameToProperty("age"); auto v1 = CreateVertex({{height, Value(static_cast(5))}, {age, Value(static_cast(10))}}, - shard_manager.get()); + request_router.get()); auto e = CreateEdge({{height, Value(static_cast(3))}, {age, Value(static_cast(15))}}, - shard_manager.get()); + request_router.get()); auto prop_values_to_int = [](TypedValue t) { std::unordered_map properties; @@ -1262,10 +1261,10 @@ TEST_F(FunctionTest, Size) { TEST_F(FunctionTest, StartNode) { ASSERT_THROW(EvaluateFunction("STARTNODE"), FunctionRuntimeException); ASSERT_TRUE(EvaluateFunction("STARTNODE", TypedValue()).IsNull()); - Label l{shard_manager->NameToLabel("label1")}; + Label l{request_router->NameToLabel("label1")}; EdgeId e_id{5}; VertexId src{l, {msgs::Value(static_cast(4))}}; - auto e = CreateEdge({}, shard_manager.get(), e_id, src); + auto e = CreateEdge({}, request_router.get(), e_id, src); const auto res = EvaluateFunction("STARTNODE", e).ValueVertex().Id(); ASSERT_EQ(res, src); ASSERT_THROW(EvaluateFunction("STARTNODE", 2), FunctionRuntimeException); @@ -1310,7 +1309,7 @@ TEST_F(FunctionTest, ToInteger) { TEST_F(FunctionTest, Type) { ASSERT_THROW(EvaluateFunction("TYPE"), FunctionRuntimeException); ASSERT_TRUE(EvaluateFunction("TYPE", TypedValue()).IsNull()); - auto e = CreateEdge({}, shard_manager.get()); + auto e = CreateEdge({}, request_router.get()); ASSERT_EQ(EvaluateFunction("TYPE", e).ValueString(), "edge_type"); ASSERT_THROW(EvaluateFunction("TYPE", 2), FunctionRuntimeException); } @@ -1329,17 +1328,17 @@ TEST_F(FunctionTest, ValueType) { ASSERT_EQ(EvaluateFunction("VALUETYPE", TypedValue(std::map{{"test", TypedValue(1)}})) .ValueString(), "MAP"); - auto v1 = CreateVertex({}, shard_manager.get()); + auto v1 = CreateVertex({}, request_router.get()); ASSERT_EQ(EvaluateFunction("VALUETYPE", v1).ValueString(), "NODE"); - auto e = CreateEdge({}, shard_manager.get()); + auto e = CreateEdge({}, request_router.get()); ASSERT_EQ(EvaluateFunction("VALUETYPE", e).ValueString(), "RELATIONSHIP"); } TEST_F(FunctionTest, Labels) { ASSERT_THROW(EvaluateFunction("LABELS"), FunctionRuntimeException); ASSERT_TRUE(EvaluateFunction("LABELS", TypedValue()).IsNull()); - Label label{shard_manager->NameToLabel("label1")}; - auto v = CreateVertex({}, shard_manager.get(), {{}, {label}}); + Label label{request_router->NameToLabel("label1")}; + auto v = CreateVertex({}, request_router.get(), {{}, {label}}); std::vector labels; auto evaluated_labels = EvaluateFunction("LABELS", v).ValueList(); labels.reserve(evaluated_labels.size()); @@ -1556,9 +1555,9 @@ TEST_F(FunctionTest, Counter) { } TEST_F(FunctionTest, Id) { - auto v = CreateVertex({}, shard_manager.get()); + auto v = CreateVertex({}, request_router.get()); EXPECT_THROW(EvaluateFunction("ID", v), FunctionRuntimeException); - auto e = CreateEdge({}, shard_manager.get(), EdgeId{10}); + auto e = CreateEdge({}, request_router.get(), EdgeId{10}); EXPECT_EQ(EvaluateFunction("ID", e).ValueInt(), 10); }