From 82db1d4ad8ddfa2765ebaa021ae4f7e0253a9b61 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Mon, 28 Nov 2022 12:38:38 +0000 Subject: [PATCH 1/3] Rename ShardRequestManager to RequestRouter --- src/glue/v2/communication.cpp | 30 +++--- src/glue/v2/communication.hpp | 24 ++--- src/memgraph.cpp | 13 ++- src/query/v2/accessors.cpp | 11 +-- src/query/v2/accessors.hpp | 15 ++- src/query/v2/bindings/eval.hpp | 11 +-- src/query/v2/context.hpp | 16 ++-- src/query/v2/conversions.hpp | 4 +- src/query/v2/cypher_query_interpreter.cpp | 6 +- src/query/v2/cypher_query_interpreter.hpp | 4 +- .../interpret/awesome_memgraph_functions.cpp | 2 +- .../interpret/awesome_memgraph_functions.hpp | 6 +- src/query/v2/interpreter.cpp | 96 +++++++++---------- src/query/v2/interpreter.hpp | 4 +- src/query/v2/plan/operator.cpp | 66 ++++++------- src/query/v2/plan/pretty_print.cpp | 21 ++-- src/query/v2/plan/pretty_print.hpp | 31 +++--- src/query/v2/plan/rule_based_planner.hpp | 2 +- src/query/v2/plan/vertex_count_cache.hpp | 14 +-- ...request_manager.hpp => request_router.hpp} | 35 ++++--- ...request_manager.cpp => request_router.cpp} | 22 ++--- tests/simulation/test_cluster.hpp | 23 +++-- tests/unit/high_density_shard_create_scan.cpp | 25 +++-- tests/unit/machine_manager.cpp | 47 +++++---- 24 files changed, 255 insertions(+), 273 deletions(-) rename src/query/v2/{shard_request_manager.hpp => request_router.hpp} (96%) rename tests/simulation/{shard_request_manager.cpp => request_router.cpp} (94%) diff --git a/src/glue/v2/communication.cpp b/src/glue/v2/communication.cpp index 3406105b0..42228652f 100644 --- a/src/glue/v2/communication.cpp +++ b/src/glue/v2/communication.cpp @@ -18,8 +18,8 @@ #include "common/errors.hpp" #include "coordinator/shard_map.hpp" #include "query/v2/accessors.hpp" +#include "query/v2/request_router.hpp" #include "query/v2/requests.hpp" -#include "query/v2/shard_request_manager.hpp" #include "storage/v3/edge_accessor.hpp" #include "storage/v3/id_types.hpp" #include "storage/v3/shard.hpp" @@ -72,7 +72,7 @@ query::v2::TypedValue ToTypedValue(const Value &value) { } communication::bolt::Vertex ToBoltVertex(const query::v2::accessors::VertexAccessor &vertex, - const query::v2::ShardRequestManagerInterface *shard_request_manager, + const query::v2::RequestRouterInterface *request_router, storage::v3::View /*view*/) { auto id = communication::bolt::Id::FromUint(0); @@ -80,44 +80,44 @@ communication::bolt::Vertex ToBoltVertex(const query::v2::accessors::VertexAcces std::vector new_labels; new_labels.reserve(labels.size()); for (const auto &label : labels) { - new_labels.push_back(shard_request_manager->LabelToName(label.id)); + new_labels.push_back(request_router->LabelToName(label.id)); } auto properties = vertex.Properties(); std::map new_properties; for (const auto &[prop, property_value] : properties) { - new_properties[shard_request_manager->PropertyToName(prop)] = ToBoltValue(property_value); + new_properties[request_router->PropertyToName(prop)] = ToBoltValue(property_value); } return communication::bolt::Vertex{id, new_labels, new_properties}; } communication::bolt::Edge ToBoltEdge(const query::v2::accessors::EdgeAccessor &edge, - const query::v2::ShardRequestManagerInterface *shard_request_manager, + const query::v2::RequestRouterInterface *request_router, storage::v3::View /*view*/) { // TODO(jbajic) Fix bolt communication auto id = communication::bolt::Id::FromUint(0); auto from = communication::bolt::Id::FromUint(0); auto to = communication::bolt::Id::FromUint(0); - const auto &type = shard_request_manager->EdgeTypeToName(edge.EdgeType()); + const auto &type = request_router->EdgeTypeToName(edge.EdgeType()); auto properties = edge.Properties(); std::map new_properties; for (const auto &[prop, property_value] : properties) { - new_properties[shard_request_manager->PropertyToName(prop)] = ToBoltValue(property_value); + new_properties[request_router->PropertyToName(prop)] = ToBoltValue(property_value); } return communication::bolt::Edge{id, from, to, type, new_properties}; } communication::bolt::Path ToBoltPath(const query::v2::accessors::Path & /*edge*/, - const query::v2::ShardRequestManagerInterface * /*shard_request_manager*/, + const query::v2::RequestRouterInterface * /*request_router*/, storage::v3::View /*view*/) { // TODO(jbajic) Fix bolt communication MG_ASSERT(false, "Path is unimplemented!"); return {}; } -Value ToBoltValue(const query::v2::TypedValue &value, - const query::v2::ShardRequestManagerInterface *shard_request_manager, storage::v3::View view) { +Value ToBoltValue(const query::v2::TypedValue &value, const query::v2::RequestRouterInterface *request_router, + storage::v3::View view) { switch (value.type()) { case query::v2::TypedValue::Type::Null: return {}; @@ -133,7 +133,7 @@ Value ToBoltValue(const query::v2::TypedValue &value, std::vector values; values.reserve(value.ValueList().size()); for (const auto &v : value.ValueList()) { - auto value = ToBoltValue(v, shard_request_manager, view); + auto value = ToBoltValue(v, request_router, view); values.emplace_back(std::move(value)); } return {std::move(values)}; @@ -141,21 +141,21 @@ Value ToBoltValue(const query::v2::TypedValue &value, case query::v2::TypedValue::Type::Map: { std::map map; for (const auto &kv : value.ValueMap()) { - auto value = ToBoltValue(kv.second, shard_request_manager, view); + auto value = ToBoltValue(kv.second, request_router, view); map.emplace(kv.first, std::move(value)); } return {std::move(map)}; } case query::v2::TypedValue::Type::Vertex: { - auto vertex = ToBoltVertex(value.ValueVertex(), shard_request_manager, view); + auto vertex = ToBoltVertex(value.ValueVertex(), request_router, view); return {std::move(vertex)}; } case query::v2::TypedValue::Type::Edge: { - auto edge = ToBoltEdge(value.ValueEdge(), shard_request_manager, view); + auto edge = ToBoltEdge(value.ValueEdge(), request_router, view); return {std::move(edge)}; } case query::v2::TypedValue::Type::Path: { - auto path = ToBoltPath(value.ValuePath(), shard_request_manager, view); + auto path = ToBoltPath(value.ValuePath(), request_router, view); return {std::move(path)}; } case query::v2::TypedValue::Type::Date: diff --git a/src/glue/v2/communication.hpp b/src/glue/v2/communication.hpp index 817ff02bd..c1661b521 100644 --- a/src/glue/v2/communication.hpp +++ b/src/glue/v2/communication.hpp @@ -15,7 +15,7 @@ #include "communication/bolt/v1/value.hpp" #include "coordinator/shard_map.hpp" #include "query/v2/bindings/typed_value.hpp" -#include "query/v2/shard_request_manager.hpp" +#include "query/v2/request_router.hpp" #include "storage/v3/property_value.hpp" #include "storage/v3/result.hpp" #include "storage/v3/shard.hpp" @@ -32,40 +32,37 @@ namespace memgraph::glue::v2 { /// @param storage::v3::VertexAccessor for converting to /// communication::bolt::Vertex. -/// @param query::v2::ShardRequestManagerInterface *shard_request_manager getting label and property names. +/// @param query::v2::RequestRouterInterface *request_router getting label and property names. /// @param storage::v3::View for deciding which vertex attributes are visible. /// /// @throw std::bad_alloc communication::bolt::Vertex ToBoltVertex(const storage::v3::VertexAccessor &vertex, - const query::v2::ShardRequestManagerInterface *shard_request_manager, + const query::v2::RequestRouterInterface *request_router, storage::v3::View view); /// @param storage::v3::EdgeAccessor for converting to communication::bolt::Edge. -/// @param query::v2::ShardRequestManagerInterface *shard_request_manager getting edge type and property names. +/// @param query::v2::RequestRouterInterface *request_router getting edge type and property names. /// @param storage::v3::View for deciding which edge attributes are visible. /// /// @throw std::bad_alloc communication::bolt::Edge ToBoltEdge(const storage::v3::EdgeAccessor &edge, - const query::v2::ShardRequestManagerInterface *shard_request_manager, - storage::v3::View view); + const query::v2::RequestRouterInterface *request_router, storage::v3::View view); /// @param query::v2::Path for converting to communication::bolt::Path. -/// @param query::v2::ShardRequestManagerInterface *shard_request_manager ToBoltVertex and ToBoltEdge. +/// @param query::v2::RequestRouterInterface *request_router ToBoltVertex and ToBoltEdge. /// @param storage::v3::View for ToBoltVertex and ToBoltEdge. /// /// @throw std::bad_alloc communication::bolt::Path ToBoltPath(const query::v2::accessors::Path &path, - const query::v2::ShardRequestManagerInterface *shard_request_manager, - storage::v3::View view); + const query::v2::RequestRouterInterface *request_router, storage::v3::View view); /// @param query::v2::TypedValue for converting to communication::bolt::Value. -/// @param query::v2::ShardRequestManagerInterface *shard_request_manager ToBoltVertex and ToBoltEdge. +/// @param query::v2::RequestRouterInterface *request_router ToBoltVertex and ToBoltEdge. /// @param storage::v3::View for ToBoltVertex and ToBoltEdge. /// /// @throw std::bad_alloc communication::bolt::Value ToBoltValue(const query::v2::TypedValue &value, - const query::v2::ShardRequestManagerInterface *shard_request_manager, - storage::v3::View view); + const query::v2::RequestRouterInterface *request_router, storage::v3::View view); query::v2::TypedValue ToTypedValue(const communication::bolt::Value &value); @@ -75,8 +72,7 @@ storage::v3::PropertyValue ToPropertyValue(const communication::bolt::Value &val communication::bolt::Value ToBoltValue(msgs::Value value); -communication::bolt::Value ToBoltValue(msgs::Value value, - const query::v2::ShardRequestManagerInterface *shard_request_manager, +communication::bolt::Value ToBoltValue(msgs::Value value, const query::v2::RequestRouterInterface *request_router, storage::v3::View view); } // namespace memgraph::glue::v2 diff --git a/src/memgraph.cpp b/src/memgraph.cpp index 565f9940c..d825cc0e7 100644 --- a/src/memgraph.cpp +++ b/src/memgraph.cpp @@ -454,7 +454,7 @@ class BoltSession final : public memgraph::communication::bolt::Session Pull(TEncoder *encoder, std::optional n, std::optional qid) override { - TypedValueResultStream stream(encoder, interpreter_.GetShardRequestManager()); + TypedValueResultStream stream(encoder, interpreter_.GetRequestRouter()); return PullResults(stream, n, qid); } @@ -481,7 +481,7 @@ class BoltSession final : public memgraph::communication::bolt::Session decoded_summary; for (const auto &kv : summary) { - auto bolt_value = memgraph::glue::v2::ToBoltValue(kv.second, interpreter_.GetShardRequestManager(), + auto bolt_value = memgraph::glue::v2::ToBoltValue(kv.second, interpreter_.GetRequestRouter(), memgraph::storage::v3::View::NEW); decoded_summary.emplace(kv.first, std::move(bolt_value)); } @@ -497,15 +497,14 @@ class BoltSession final : public memgraph::communication::bolt::Session &values) { std::vector decoded_values; decoded_values.reserve(values.size()); for (const auto &v : values) { - auto bolt_value = memgraph::glue::v2::ToBoltValue(v, shard_request_manager_, memgraph::storage::v3::View::NEW); + auto bolt_value = memgraph::glue::v2::ToBoltValue(v, request_router_, memgraph::storage::v3::View::NEW); decoded_values.emplace_back(std::move(bolt_value)); } encoder_->MessageRecord(decoded_values); @@ -513,7 +512,7 @@ class BoltSession final : public memgraph::communication::bolt::Session> props, - const ShardRequestManagerInterface *manager) + const RequestRouterInterface *manager) : vertex(std::move(v)), properties(std::move(props)), manager_(manager) {} -VertexAccessor::VertexAccessor(Vertex v, std::map &&props, - const ShardRequestManagerInterface *manager) +VertexAccessor::VertexAccessor(Vertex v, std::map &&props, const RequestRouterInterface *manager) : vertex(std::move(v)), manager_(manager) { properties.reserve(props.size()); for (auto &[id, value] : props) { @@ -57,7 +56,7 @@ VertexAccessor::VertexAccessor(Vertex v, std::map &&props, } VertexAccessor::VertexAccessor(Vertex v, const std::map &props, - const ShardRequestManagerInterface *manager) + const RequestRouterInterface *manager) : vertex(std::move(v)), manager_(manager) { properties.reserve(props.size()); for (const auto &[id, value] : props) { diff --git a/src/query/v2/accessors.hpp b/src/query/v2/accessors.hpp index ca7ec999d..b5585953f 100644 --- a/src/query/v2/accessors.hpp +++ b/src/query/v2/accessors.hpp @@ -25,7 +25,7 @@ #include "utils/memory_tracker.hpp" namespace memgraph::query::v2 { -class ShardRequestManagerInterface; +class RequestRouterInterface; } // namespace memgraph::query::v2 namespace memgraph::query::v2::accessors { @@ -41,7 +41,7 @@ class VertexAccessor; class EdgeAccessor final { public: - explicit EdgeAccessor(Edge edge, const ShardRequestManagerInterface *manager); + explicit EdgeAccessor(Edge edge, const RequestRouterInterface *manager); [[nodiscard]] EdgeTypeId EdgeType() const; @@ -69,7 +69,7 @@ class EdgeAccessor final { private: Edge edge; - const ShardRequestManagerInterface *manager_; + const RequestRouterInterface *manager_; }; class VertexAccessor final { @@ -77,11 +77,10 @@ class VertexAccessor final { using PropertyId = msgs::PropertyId; using Label = msgs::Label; using VertexId = msgs::VertexId; - VertexAccessor(Vertex v, std::vector> props, - const ShardRequestManagerInterface *manager); + VertexAccessor(Vertex v, std::vector> props, const RequestRouterInterface *manager); - VertexAccessor(Vertex v, std::map &&props, const ShardRequestManagerInterface *manager); - VertexAccessor(Vertex v, const std::map &props, const ShardRequestManagerInterface *manager); + VertexAccessor(Vertex v, std::map &&props, const RequestRouterInterface *manager); + VertexAccessor(Vertex v, const std::map &props, const RequestRouterInterface *manager); [[nodiscard]] Label PrimaryLabel() const; @@ -150,7 +149,7 @@ class VertexAccessor final { private: Vertex vertex; std::vector> properties; - const ShardRequestManagerInterface *manager_; + const RequestRouterInterface *manager_; }; // inline VertexAccessor EdgeAccessor::To() const { return VertexAccessor(impl_.ToVertex()); } diff --git a/src/query/v2/bindings/eval.hpp b/src/query/v2/bindings/eval.hpp index 584e88922..912850e87 100644 --- a/src/query/v2/bindings/eval.hpp +++ b/src/query/v2/bindings/eval.hpp @@ -26,7 +26,7 @@ namespace memgraph::query::v2 { -class ShardRequestManagerInterface; +class RequestRouterInterface; inline const auto lam = [](const auto &val) { return ValueToTypedValue(val); }; namespace detail { @@ -35,15 +35,14 @@ class Callable { auto operator()(const storage::v3::PropertyValue &val) const { return storage::v3::PropertyToTypedValue(val); }; - auto operator()(const msgs::Value &val, ShardRequestManagerInterface *manager) const { + auto operator()(const msgs::Value &val, RequestRouterInterface *manager) const { return ValueToTypedValue(val, manager); }; }; } // namespace detail -using ExpressionEvaluator = - expr::ExpressionEvaluator; +using ExpressionEvaluator = expr::ExpressionEvaluator; } // namespace memgraph::query::v2 diff --git a/src/query/v2/context.hpp b/src/query/v2/context.hpp index 388342349..cb30a9ced 100644 --- a/src/query/v2/context.hpp +++ b/src/query/v2/context.hpp @@ -20,7 +20,7 @@ #include "query/v2/parameters.hpp" #include "query/v2/plan/profile.hpp" //#include "query/v2/trigger.hpp" -#include "query/v2/shard_request_manager.hpp" +#include "query/v2/request_router.hpp" #include "utils/async_timer.hpp" namespace memgraph::query::v2 { @@ -61,26 +61,26 @@ struct EvaluationContext { }; inline std::vector NamesToProperties(const std::vector &property_names, - ShardRequestManagerInterface *shard_request_manager) { + RequestRouterInterface *request_router) { std::vector properties; // TODO Fix by using reference properties.reserve(property_names.size()); - if (shard_request_manager != nullptr) { + if (request_router != nullptr) { for (const auto &name : property_names) { - properties.push_back(shard_request_manager->NameToProperty(name)); + properties.push_back(request_router->NameToProperty(name)); } } return properties; } inline std::vector NamesToLabels(const std::vector &label_names, - ShardRequestManagerInterface *shard_request_manager) { + RequestRouterInterface *request_router) { std::vector labels; labels.reserve(label_names.size()); // TODO Fix by using reference - if (shard_request_manager != nullptr) { + if (request_router != nullptr) { for (const auto &name : label_names) { - labels.push_back(shard_request_manager->NameToLabel(name)); + labels.push_back(request_router->NameToLabel(name)); } } return labels; @@ -97,7 +97,7 @@ struct ExecutionContext { plan::ProfilingStats *stats_root{nullptr}; ExecutionStats execution_stats; utils::AsyncTimer timer; - ShardRequestManagerInterface *shard_request_manager{nullptr}; + RequestRouterInterface *request_router{nullptr}; IdAllocator *edge_ids_alloc; }; diff --git a/src/query/v2/conversions.hpp b/src/query/v2/conversions.hpp index c18ba8cc0..161f89979 100644 --- a/src/query/v2/conversions.hpp +++ b/src/query/v2/conversions.hpp @@ -12,12 +12,12 @@ #pragma once #include "bindings/typed_value.hpp" #include "query/v2/accessors.hpp" +#include "query/v2/request_router.hpp" #include "query/v2/requests.hpp" -#include "query/v2/shard_request_manager.hpp" namespace memgraph::query::v2 { -inline TypedValue ValueToTypedValue(const msgs::Value &value, ShardRequestManagerInterface *manager) { +inline TypedValue ValueToTypedValue(const msgs::Value &value, RequestRouterInterface *manager) { using Value = msgs::Value; switch (value.type) { case Value::Type::Null: diff --git a/src/query/v2/cypher_query_interpreter.cpp b/src/query/v2/cypher_query_interpreter.cpp index 908ee36cb..880165ad8 100644 --- a/src/query/v2/cypher_query_interpreter.cpp +++ b/src/query/v2/cypher_query_interpreter.cpp @@ -11,7 +11,7 @@ #include "query/v2/cypher_query_interpreter.hpp" #include "query/v2/bindings/symbol_generator.hpp" -#include "query/v2/shard_request_manager.hpp" +#include "query/v2/request_router.hpp" // NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables) DEFINE_HIDDEN_bool(query_cost_planner, true, "Use the cost-estimating query planner."); @@ -118,7 +118,7 @@ ParsedQuery ParseQuery(const std::string &query_string, const std::map MakeLogicalPlan(AstStorage ast_storage, CypherQuery *query, const Parameters ¶meters, - ShardRequestManagerInterface *shard_manager, + RequestRouterInterface *shard_manager, const std::vector &predefined_identifiers) { auto vertex_counts = plan::MakeVertexCountCache(shard_manager); auto symbol_table = expr::MakeSymbolTable(query, predefined_identifiers); @@ -130,7 +130,7 @@ std::unique_ptr MakeLogicalPlan(AstStorage ast_storage, CypherQuery std::shared_ptr CypherQueryToPlan(uint64_t hash, AstStorage ast_storage, CypherQuery *query, const Parameters ¶meters, utils::SkipList *plan_cache, - ShardRequestManagerInterface *shard_manager, + RequestRouterInterface *shard_manager, const std::vector &predefined_identifiers) { std::optional::Accessor> plan_cache_access; if (plan_cache) { diff --git a/src/query/v2/cypher_query_interpreter.hpp b/src/query/v2/cypher_query_interpreter.hpp index b7f63ab8f..c9d65047c 100644 --- a/src/query/v2/cypher_query_interpreter.hpp +++ b/src/query/v2/cypher_query_interpreter.hpp @@ -132,7 +132,7 @@ class SingleNodeLogicalPlan final : public LogicalPlan { }; std::unique_ptr MakeLogicalPlan(AstStorage ast_storage, CypherQuery *query, const Parameters ¶meters, - ShardRequestManagerInterface *shard_manager, + RequestRouterInterface *shard_manager, const std::vector &predefined_identifiers); /** @@ -145,7 +145,7 @@ std::unique_ptr MakeLogicalPlan(AstStorage ast_storage, CypherQuery */ std::shared_ptr CypherQueryToPlan(uint64_t hash, AstStorage ast_storage, CypherQuery *query, const Parameters ¶meters, utils::SkipList *plan_cache, - ShardRequestManagerInterface *shard_manager, + RequestRouterInterface *shard_manager, const std::vector &predefined_identifiers = {}); } // namespace memgraph::query::v2 diff --git a/src/query/v2/interpret/awesome_memgraph_functions.cpp b/src/query/v2/interpret/awesome_memgraph_functions.cpp index 8768736b2..54fcd4468 100644 --- a/src/query/v2/interpret/awesome_memgraph_functions.cpp +++ b/src/query/v2/interpret/awesome_memgraph_functions.cpp @@ -23,7 +23,7 @@ #include "query/v2/bindings/typed_value.hpp" #include "query/v2/conversions.hpp" #include "query/v2/exceptions.hpp" -#include "query/v2/shard_request_manager.hpp" +#include "query/v2/request_router.hpp" #include "storage/v3/conversions.hpp" #include "utils/string.hpp" #include "utils/temporal.hpp" diff --git a/src/query/v2/interpret/awesome_memgraph_functions.hpp b/src/query/v2/interpret/awesome_memgraph_functions.hpp index 1fd351cd8..35ed1d797 100644 --- a/src/query/v2/interpret/awesome_memgraph_functions.hpp +++ b/src/query/v2/interpret/awesome_memgraph_functions.hpp @@ -22,7 +22,7 @@ namespace memgraph::query::v2 { -class ShardRequestManagerInterface; +class RequestRouterInterface; namespace { const char kStartsWith[] = "STARTSWITH"; @@ -32,9 +32,9 @@ const char kId[] = "ID"; } // namespace struct FunctionContext { - // TODO(kostasrim) consider optional here. ShardRequestManager does not exist on the storage. + // TODO(kostasrim) consider optional here. RequestRouter does not exist on the storage. // DbAccessor *db_accessor; - ShardRequestManagerInterface *manager; + RequestRouterInterface *manager; utils::MemoryResource *memory; int64_t timestamp; std::unordered_map *counters; diff --git a/src/query/v2/interpreter.cpp b/src/query/v2/interpreter.cpp index f9dc37184..94ff7a8ff 100644 --- a/src/query/v2/interpreter.cpp +++ b/src/query/v2/interpreter.cpp @@ -44,7 +44,7 @@ #include "query/v2/plan/planner.hpp" #include "query/v2/plan/profile.hpp" #include "query/v2/plan/vertex_count_cache.hpp" -#include "query/v2/shard_request_manager.hpp" +#include "query/v2/request_router.hpp" #include "storage/v3/property_value.hpp" #include "storage/v3/shard.hpp" #include "utils/algorithm.hpp" @@ -143,7 +143,7 @@ class ReplQueryHandler final : public query::v2::ReplicationQueryHandler { /// @throw QueryRuntimeException if an error ocurred. Callback HandleAuthQuery(AuthQuery *auth_query, AuthQueryHandler *auth, const Parameters ¶meters, - ShardRequestManagerInterface *manager) { + RequestRouterInterface *manager) { // Empty frame for evaluation of password expression. This is OK since // password should be either null or string literal and it's evaluation // should not depend on frame. @@ -312,7 +312,7 @@ Callback HandleAuthQuery(AuthQuery *auth_query, AuthQueryHandler *auth, const Pa } Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters ¶meters, - InterpreterContext *interpreter_context, ShardRequestManagerInterface *manager, + InterpreterContext *interpreter_context, RequestRouterInterface *manager, std::vector *notifications) { expr::Frame frame(0); SymbolTable symbol_table; @@ -448,7 +448,7 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters & } Callback HandleSettingQuery(SettingQuery *setting_query, const Parameters ¶meters, - ShardRequestManagerInterface *manager) { + RequestRouterInterface *manager) { expr::Frame frame(0); SymbolTable symbol_table; EvaluationContext evaluation_context; @@ -649,7 +649,7 @@ struct PullPlanVector { struct PullPlan { explicit PullPlan(std::shared_ptr plan, const Parameters ¶meters, bool is_profile_query, DbAccessor *dba, InterpreterContext *interpreter_context, utils::MemoryResource *execution_memory, - ShardRequestManagerInterface *shard_request_manager = nullptr, + RequestRouterInterface *request_router = nullptr, // TriggerContextCollector *trigger_context_collector = nullptr, std::optional memory_limit = {}); std::optional Pull(AnyStream *stream, std::optional n, @@ -679,7 +679,7 @@ struct PullPlan { PullPlan::PullPlan(const std::shared_ptr plan, const Parameters ¶meters, const bool is_profile_query, DbAccessor *dba, InterpreterContext *interpreter_context, utils::MemoryResource *execution_memory, - ShardRequestManagerInterface *shard_request_manager, const std::optional memory_limit) + RequestRouterInterface *request_router, const std::optional memory_limit) : plan_(plan), cursor_(plan->plan().MakeCursor(execution_memory)), frame_(plan->symbol_table().max_position(), execution_memory), @@ -688,14 +688,14 @@ PullPlan::PullPlan(const std::shared_ptr plan, const Parameters &par ctx_.symbol_table = plan->symbol_table(); ctx_.evaluation_context.timestamp = QueryTimestamp(); ctx_.evaluation_context.parameters = parameters; - ctx_.evaluation_context.properties = NamesToProperties(plan->ast_storage().properties_, shard_request_manager); - ctx_.evaluation_context.labels = NamesToLabels(plan->ast_storage().labels_, shard_request_manager); + ctx_.evaluation_context.properties = NamesToProperties(plan->ast_storage().properties_, request_router); + ctx_.evaluation_context.labels = NamesToLabels(plan->ast_storage().labels_, request_router); if (interpreter_context->config.execution_timeout_sec > 0) { ctx_.timer = utils::AsyncTimer{interpreter_context->config.execution_timeout_sec}; } ctx_.is_shutting_down = &interpreter_context->is_shutting_down; ctx_.is_profile_query = is_profile_query; - ctx_.shard_request_manager = shard_request_manager; + ctx_.request_router = request_router; ctx_.edge_ids_alloc = &interpreter_context->edge_ids_alloc; } @@ -804,7 +804,7 @@ Interpreter::Interpreter(InterpreterContext *interpreter_context) : interpreter_ auto random_uuid = boost::uuids::uuid{boost::uuids::random_generator()()}; auto query_io = interpreter_context_->io.ForkLocal(random_uuid); - shard_request_manager_ = std::make_unique>( + request_router_ = std::make_unique>( coordinator::CoordinatorClient( query_io, interpreter_context_->coordinator_address, std::vector{interpreter_context_->coordinator_address}), std::move(query_io)); @@ -881,7 +881,7 @@ PreparedQuery Interpreter::PrepareTransactionQuery(std::string_view query_upper) PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::map *summary, InterpreterContext *interpreter_context, DbAccessor *dba, utils::MemoryResource *execution_memory, std::vector *notifications, - ShardRequestManagerInterface *shard_request_manager) { + RequestRouterInterface *request_router) { // TriggerContextCollector *trigger_context_collector = nullptr) { auto *cypher_query = utils::Downcast(parsed_query.query); @@ -890,8 +890,7 @@ PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::mapmemory_limit_, cypher_query->memory_scale_); if (memory_limit) { @@ -906,9 +905,9 @@ PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::mapplan_cache : nullptr, shard_request_manager); + auto plan = CypherQueryToPlan(parsed_query.stripped_query.hash(), std::move(parsed_query.ast_storage), cypher_query, + parsed_query.parameters, + parsed_query.is_cacheable ? &interpreter_context->plan_cache : nullptr, request_router); summary->insert_or_assign("cost_estimate", plan->cost()); auto rw_type_checker = plan::ReadWriteTypeChecker(); @@ -927,7 +926,7 @@ PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::map(plan, parsed_query.parameters, false, dba, interpreter_context, - execution_memory, shard_request_manager, memory_limit); + execution_memory, request_router, memory_limit); // execution_memory, trigger_context_collector, memory_limit); return PreparedQuery{std::move(header), std::move(parsed_query.required_privileges), [pull_plan = std::move(pull_plan), output_symbols = std::move(output_symbols), summary]( @@ -941,8 +940,7 @@ PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::map *summary, - InterpreterContext *interpreter_context, - ShardRequestManagerInterface *shard_request_manager, + InterpreterContext *interpreter_context, RequestRouterInterface *request_router, utils::MemoryResource *execution_memory) { const std::string kExplainQueryStart = "explain "; MG_ASSERT(utils::StartsWith(utils::ToLowerCase(parsed_query.stripped_query.query()), kExplainQueryStart), @@ -961,20 +959,20 @@ PreparedQuery PrepareExplainQuery(ParsedQuery parsed_query, std::map(parsed_inner_query.query); MG_ASSERT(cypher_query, "Cypher grammar should not allow other queries in EXPLAIN"); - auto cypher_query_plan = CypherQueryToPlan( - parsed_inner_query.stripped_query.hash(), std::move(parsed_inner_query.ast_storage), cypher_query, - parsed_inner_query.parameters, parsed_inner_query.is_cacheable ? &interpreter_context->plan_cache : nullptr, - shard_request_manager); + auto cypher_query_plan = + CypherQueryToPlan(parsed_inner_query.stripped_query.hash(), std::move(parsed_inner_query.ast_storage), + cypher_query, parsed_inner_query.parameters, + parsed_inner_query.is_cacheable ? &interpreter_context->plan_cache : nullptr, request_router); std::stringstream printed_plan; - plan::PrettyPrint(*shard_request_manager, &cypher_query_plan->plan(), &printed_plan); + plan::PrettyPrint(*request_router, &cypher_query_plan->plan(), &printed_plan); std::vector> printed_plan_rows; for (const auto &row : utils::Split(utils::RTrim(printed_plan.str()), "\n")) { printed_plan_rows.push_back(std::vector{TypedValue(row)}); } - summary->insert_or_assign("explain", plan::PlanToJson(*shard_request_manager, &cypher_query_plan->plan()).dump()); + summary->insert_or_assign("explain", plan::PlanToJson(*request_router, &cypher_query_plan->plan()).dump()); return PreparedQuery{{"QUERY PLAN"}, std::move(parsed_query.required_privileges), @@ -991,7 +989,7 @@ PreparedQuery PrepareExplainQuery(ParsedQuery parsed_query, std::map *summary, InterpreterContext *interpreter_context, DbAccessor *dba, utils::MemoryResource *execution_memory, - ShardRequestManagerInterface *shard_request_manager = nullptr) { + RequestRouterInterface *request_router = nullptr) { const std::string kProfileQueryStart = "profile "; MG_ASSERT(utils::StartsWith(utils::ToLowerCase(parsed_query.stripped_query.query()), kProfileQueryStart), @@ -1034,22 +1032,21 @@ PreparedQuery PrepareProfileQuery(ParsedQuery parsed_query, bool in_explicit_tra EvaluationContext evaluation_context; evaluation_context.timestamp = QueryTimestamp(); evaluation_context.parameters = parsed_inner_query.parameters; - ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, shard_request_manager, - storage::v3::View::OLD); + ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, request_router, storage::v3::View::OLD); const auto memory_limit = expr::EvaluateMemoryLimit(&evaluator, cypher_query->memory_limit_, cypher_query->memory_scale_); - auto cypher_query_plan = CypherQueryToPlan( - parsed_inner_query.stripped_query.hash(), std::move(parsed_inner_query.ast_storage), cypher_query, - parsed_inner_query.parameters, parsed_inner_query.is_cacheable ? &interpreter_context->plan_cache : nullptr, - shard_request_manager); + auto cypher_query_plan = + CypherQueryToPlan(parsed_inner_query.stripped_query.hash(), std::move(parsed_inner_query.ast_storage), + cypher_query, parsed_inner_query.parameters, + parsed_inner_query.is_cacheable ? &interpreter_context->plan_cache : nullptr, request_router); auto rw_type_checker = plan::ReadWriteTypeChecker(); rw_type_checker.InferRWType(const_cast(cypher_query_plan->plan())); return PreparedQuery{{"OPERATOR", "ACTUAL HITS", "RELATIVE TIME", "ABSOLUTE TIME", "CUSTOM DATA"}, std::move(parsed_query.required_privileges), [plan = std::move(cypher_query_plan), parameters = std::move(parsed_inner_query.parameters), - summary, dba, interpreter_context, execution_memory, memory_limit, shard_request_manager, + summary, dba, interpreter_context, execution_memory, memory_limit, request_router, // We want to execute the query we are profiling lazily, so we delay // the construction of the corresponding context. stats_and_total_time = std::optional{}, @@ -1058,7 +1055,7 @@ PreparedQuery PrepareProfileQuery(ParsedQuery parsed_query, bool in_explicit_tra // No output symbols are given so that nothing is streamed. if (!stats_and_total_time) { stats_and_total_time = PullPlan(plan, parameters, true, dba, interpreter_context, - execution_memory, shard_request_manager, memory_limit) + execution_memory, request_router, memory_limit) .Pull(stream, {}, {}, summary); pull_plan = std::make_shared(ProfilingStatsToTable(*stats_and_total_time)); } @@ -1185,7 +1182,7 @@ PreparedQuery PrepareIndexQuery(ParsedQuery parsed_query, bool in_explicit_trans PreparedQuery PrepareAuthQuery(ParsedQuery parsed_query, bool in_explicit_transaction, std::map *summary, InterpreterContext *interpreter_context, DbAccessor *dba, utils::MemoryResource *execution_memory, - ShardRequestManagerInterface *manager) { + RequestRouterInterface *manager) { if (in_explicit_transaction) { throw UserModificationInMulticommandTxException(); } @@ -1221,7 +1218,7 @@ PreparedQuery PrepareAuthQuery(ParsedQuery parsed_query, bool in_explicit_transa PreparedQuery PrepareReplicationQuery(ParsedQuery parsed_query, const bool in_explicit_transaction, std::vector *notifications, InterpreterContext *interpreter_context, - ShardRequestManagerInterface *manager) { + RequestRouterInterface *manager) { if (in_explicit_transaction) { throw ReplicationModificationInMulticommandTxException(); } @@ -1317,7 +1314,7 @@ PreparedQuery PrepareCreateSnapshotQuery(ParsedQuery parsed_query, bool in_expli } PreparedQuery PrepareSettingQuery(ParsedQuery parsed_query, const bool in_explicit_transaction, - ShardRequestManagerInterface *manager) { + RequestRouterInterface *manager) { if (in_explicit_transaction) { throw SettingConfigInMulticommandTxException{}; } @@ -1521,7 +1518,7 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string, if (!in_explicit_transaction_ && (utils::Downcast(parsed_query.query) || utils::Downcast(parsed_query.query) || utils::Downcast(parsed_query.query))) { - shard_request_manager_->StartTransaction(); + request_router_->StartTransaction(); } utils::Timer planning_timer; @@ -1530,14 +1527,14 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string, if (utils::Downcast(parsed_query.query)) { prepared_query = PrepareCypherQuery(std::move(parsed_query), &query_execution->summary, interpreter_context_, &*execution_db_accessor_, &query_execution->execution_memory, - &query_execution->notifications, shard_request_manager_.get()); + &query_execution->notifications, request_router_.get()); } else if (utils::Downcast(parsed_query.query)) { prepared_query = PrepareExplainQuery(std::move(parsed_query), &query_execution->summary, interpreter_context_, - &*shard_request_manager_, &query_execution->execution_memory_with_exception); + &*request_router_, &query_execution->execution_memory_with_exception); } else if (utils::Downcast(parsed_query.query)) { - prepared_query = PrepareProfileQuery( - std::move(parsed_query), in_explicit_transaction_, &query_execution->summary, interpreter_context_, - &*execution_db_accessor_, &query_execution->execution_memory_with_exception, shard_request_manager_.get()); + prepared_query = PrepareProfileQuery(std::move(parsed_query), in_explicit_transaction_, &query_execution->summary, + interpreter_context_, &*execution_db_accessor_, + &query_execution->execution_memory_with_exception, request_router_.get()); } else if (utils::Downcast(parsed_query.query)) { prepared_query = PrepareDumpQuery(std::move(parsed_query), &query_execution->summary, &*execution_db_accessor_, &query_execution->execution_memory); @@ -1545,9 +1542,9 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string, prepared_query = PrepareIndexQuery(std::move(parsed_query), in_explicit_transaction_, &query_execution->notifications, interpreter_context_); } else if (utils::Downcast(parsed_query.query)) { - prepared_query = PrepareAuthQuery( - std::move(parsed_query), in_explicit_transaction_, &query_execution->summary, interpreter_context_, - &*execution_db_accessor_, &query_execution->execution_memory_with_exception, shard_request_manager_.get()); + prepared_query = PrepareAuthQuery(std::move(parsed_query), in_explicit_transaction_, &query_execution->summary, + interpreter_context_, &*execution_db_accessor_, + &query_execution->execution_memory_with_exception, request_router_.get()); } else if (utils::Downcast(parsed_query.query)) { prepared_query = PrepareInfoQuery(std::move(parsed_query), in_explicit_transaction_, &query_execution->summary, interpreter_context_, interpreter_context_->db, @@ -1558,7 +1555,7 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string, } else if (utils::Downcast(parsed_query.query)) { prepared_query = PrepareReplicationQuery(std::move(parsed_query), in_explicit_transaction_, &query_execution->notifications, - interpreter_context_, shard_request_manager_.get()); + interpreter_context_, request_router_.get()); } else if (utils::Downcast(parsed_query.query)) { prepared_query = PrepareLockPathQuery(std::move(parsed_query), in_explicit_transaction_, interpreter_context_, &*execution_db_accessor_); @@ -1575,8 +1572,7 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string, prepared_query = PrepareCreateSnapshotQuery(std::move(parsed_query), in_explicit_transaction_, interpreter_context_); } else if (utils::Downcast(parsed_query.query)) { - prepared_query = - PrepareSettingQuery(std::move(parsed_query), in_explicit_transaction_, shard_request_manager_.get()); + prepared_query = PrepareSettingQuery(std::move(parsed_query), in_explicit_transaction_, request_router_.get()); } else if (utils::Downcast(parsed_query.query)) { prepared_query = PrepareVersionQuery(std::move(parsed_query), in_explicit_transaction_); } else if (utils::Downcast(parsed_query.query)) { @@ -1617,7 +1613,7 @@ void Interpreter::Commit() { // For now, we will not check if there are some unfinished queries. // We should document clearly that all results should be pulled to complete // a query. - shard_request_manager_->Commit(); + request_router_->Commit(); if (!db_accessor_) return; const auto reset_necessary_members = [this]() { diff --git a/src/query/v2/interpreter.hpp b/src/query/v2/interpreter.hpp index afc298a0c..985c9a90c 100644 --- a/src/query/v2/interpreter.hpp +++ b/src/query/v2/interpreter.hpp @@ -296,7 +296,7 @@ class Interpreter final { */ void Abort(); - const ShardRequestManagerInterface *GetShardRequestManager() const { return shard_request_manager_.get(); } + const RequestRouterInterface *GetRequestRouter() const { return request_router_.get(); } private: struct QueryExecution { @@ -342,7 +342,7 @@ class Interpreter final { // move this unique_ptr into a shared_ptr. std::unique_ptr db_accessor_; std::optional execution_db_accessor_; - std::unique_ptr shard_request_manager_; + std::unique_ptr request_router_; bool in_explicit_transaction_{false}; bool expect_rollback_{false}; diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp index 30db1012e..2d2849f0d 100644 --- a/src/query/v2/plan/operator.cpp +++ b/src/query/v2/plan/operator.cpp @@ -39,8 +39,8 @@ #include "query/v2/frontend/ast/ast.hpp" #include "query/v2/path.hpp" #include "query/v2/plan/scoped_profile.hpp" +#include "query/v2/request_router.hpp" #include "query/v2/requests.hpp" -#include "query/v2/shard_request_manager.hpp" #include "storage/v3/conversions.hpp" #include "storage/v3/property_value.hpp" #include "utils/algorithm.hpp" @@ -177,7 +177,7 @@ class DistributedCreateNodeCursor : public Cursor { bool Pull(Frame &frame, ExecutionContext &context) override { SCOPED_PROFILE_OP("CreateNode"); if (input_cursor_->Pull(frame, context)) { - auto &shard_manager = context.shard_request_manager; + auto &shard_manager = context.request_router; { SCOPED_REQUEST_WAIT_PROFILE; shard_manager->Request(state_, NodeCreationInfoToRequest(context, frame)); @@ -197,8 +197,8 @@ class DistributedCreateNodeCursor : public Cursor { // TODO(kostasrim) Make this work with batching const auto primary_label = msgs::Label{.id = nodes_info_[0]->labels[0]}; msgs::Vertex v{.id = std::make_pair(primary_label, primary_keys_[0])}; - frame[nodes_info_.front()->symbol] = TypedValue( - query::v2::accessors::VertexAccessor(std::move(v), src_vertex_props_[0], context.shard_request_manager)); + frame[nodes_info_.front()->symbol] = + TypedValue(query::v2::accessors::VertexAccessor(std::move(v), src_vertex_props_[0], context.request_router)); } std::vector NodeCreationInfoToRequest(ExecutionContext &context, Frame &frame) { @@ -218,7 +218,7 @@ class DistributedCreateNodeCursor : public Cursor { if (const auto *node_info_properties = std::get_if(&node_info->properties)) { for (const auto &[key, value_expression] : *node_info_properties) { TypedValue val = value_expression->Accept(evaluator); - if (context.shard_request_manager->IsPrimaryKey(primary_label, key)) { + if (context.request_router->IsPrimaryKey(primary_label, key)) { rqst.primary_key.push_back(TypedValueToValue(val)); pk.push_back(TypedValueToValue(val)); } @@ -227,8 +227,8 @@ class DistributedCreateNodeCursor : public Cursor { auto property_map = evaluator.Visit(*std::get(node_info->properties)).ValueMap(); for (const auto &[key, value] : property_map) { auto key_str = std::string(key); - auto property_id = context.shard_request_manager->NameToProperty(key_str); - if (context.shard_request_manager->IsPrimaryKey(primary_label, property_id)) { + auto property_id = context.request_router->NameToProperty(key_str); + if (context.request_router->IsPrimaryKey(primary_label, property_id)) { rqst.primary_key.push_back(TypedValueToValue(value)); pk.push_back(TypedValueToValue(value)); } @@ -386,7 +386,7 @@ class DistributedScanAllAndFilterCursor : public Cursor { using VertexAccessor = accessors::VertexAccessor; - bool MakeRequest(ShardRequestManagerInterface &shard_manager, ExecutionContext &context) { + bool MakeRequest(RequestRouterInterface &shard_manager, ExecutionContext &context) { { SCOPED_REQUEST_WAIT_PROFILE; current_batch = shard_manager.Request(request_state_); @@ -398,7 +398,7 @@ class DistributedScanAllAndFilterCursor : public Cursor { bool Pull(Frame &frame, ExecutionContext &context) override { SCOPED_PROFILE_OP(op_name_); - auto &shard_manager = *context.shard_request_manager; + auto &shard_manager = *context.request_router; while (true) { if (MustAbort(context)) { throw HintedAbortError(); @@ -696,7 +696,7 @@ bool Filter::FilterCursor::Pull(Frame &frame, ExecutionContext &context) { // Like all filters, newly set values should not affect filtering of old // nodes and edges. - ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.shard_request_manager, + ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router, storage::v3::View::OLD); while (input_cursor_->Pull(frame, context)) { if (EvaluateFilter(evaluator, self_.expression_)) return true; @@ -737,8 +737,8 @@ bool Produce::ProduceCursor::Pull(Frame &frame, ExecutionContext &context) { if (input_cursor_->Pull(frame, context)) { // Produce should always yield the latest results. - ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, - context.shard_request_manager, storage::v3::View::NEW); + ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router, + storage::v3::View::NEW); for (auto named_expr : self_.named_expressions_) named_expr->Accept(evaluator); return true; @@ -1122,8 +1122,8 @@ class AggregateCursor : public Cursor { * aggregation results, and not on the number of inputs. */ void ProcessAll(Frame *frame, ExecutionContext *context) { - ExpressionEvaluator evaluator(frame, context->symbol_table, context->evaluation_context, - context->shard_request_manager, storage::v3::View::NEW); + ExpressionEvaluator evaluator(frame, context->symbol_table, context->evaluation_context, context->request_router, + storage::v3::View::NEW); while (input_cursor_->Pull(*frame, *context)) { ProcessOne(*frame, &evaluator); } @@ -1343,8 +1343,8 @@ bool Skip::SkipCursor::Pull(Frame &frame, ExecutionContext &context) { // First successful pull from the input, evaluate the skip expression. // The skip expression doesn't contain identifiers so graph view // parameter is not important. - ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, - context.shard_request_manager, storage::v3::View::OLD); + ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router, + storage::v3::View::OLD); TypedValue to_skip = self_.expression_->Accept(evaluator); if (to_skip.type() != TypedValue::Type::Int) throw QueryRuntimeException("Number of elements to skip must be an integer."); @@ -1398,8 +1398,8 @@ bool Limit::LimitCursor::Pull(Frame &frame, ExecutionContext &context) { if (limit_ == -1) { // Limit expression doesn't contain identifiers so graph view is not // important. - ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, - context.shard_request_manager, storage::v3::View::OLD); + ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router, + storage::v3::View::OLD); TypedValue limit = self_.expression_->Accept(evaluator); if (limit.type() != TypedValue::Type::Int) throw QueryRuntimeException("Limit on number of returned elements must be an integer."); @@ -1454,8 +1454,8 @@ class OrderByCursor : public Cursor { SCOPED_PROFILE_OP("OrderBy"); if (!did_pull_all_) { - ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, - context.shard_request_manager, storage::v3::View::OLD); + ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router, + storage::v3::View::OLD); auto *mem = cache_.get_allocator().GetMemoryResource(); while (input_cursor_->Pull(frame, context)) { // collect the order_by elements @@ -1712,8 +1712,8 @@ class UnwindCursor : public Cursor { if (!input_cursor_->Pull(frame, context)) return false; // successful pull from input, initialize value and iterator - ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, - context.shard_request_manager, storage::v3::View::OLD); + ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router, + storage::v3::View::OLD); TypedValue input_value = self_.input_expression_->Accept(evaluator); if (input_value.type() != TypedValue::Type::List) throw QueryRuntimeException("Argument of UNWIND must be a list, but '{}' was provided.", input_value.type()); @@ -2224,7 +2224,7 @@ class LoadCsvCursor : public Cursor { Frame frame(0); SymbolTable symbol_table; auto evaluator = - ExpressionEvaluator(&frame, symbol_table, eval_context, context.shard_request_manager, storage::v3::View::OLD); + ExpressionEvaluator(&frame, symbol_table, eval_context, context.request_router, storage::v3::View::OLD); auto maybe_file = ToOptionalString(&evaluator, self_->file_); auto maybe_delim = ToOptionalString(&evaluator, self_->delimiter_); @@ -2261,8 +2261,8 @@ class ForeachCursor : public Cursor { return false; } - ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, - context.shard_request_manager, storage::v3::View::NEW); + ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router, + storage::v3::View::NEW); TypedValue expr_result = expression->Accept(evaluator); if (expr_result.IsNull()) { @@ -2338,7 +2338,7 @@ class DistributedCreateExpandCursor : public Cursor { if (!input_cursor_->Pull(frame, context)) { return false; } - auto &shard_manager = context.shard_request_manager; + auto &shard_manager = context.request_router; ResetExecutionState(); { SCOPED_REQUEST_WAIT_PROFILE; @@ -2379,7 +2379,7 @@ class DistributedCreateExpandCursor : public Cursor { // handle parameter auto property_map = evaluator.Visit(*std::get(edge_info.properties)).ValueMap(); for (const auto &[property, value] : property_map) { - const auto property_id = context.shard_request_manager->NameToProperty(std::string(property)); + const auto property_id = context.request_router->NameToProperty(std::string(property)); request.properties.emplace_back(property_id, storage::v3::TypedValueToValue(value)); } } @@ -2394,7 +2394,7 @@ class DistributedCreateExpandCursor : public Cursor { const auto set_vertex = [&context](const auto &vertex, auto &vertex_id) { vertex_id.first = vertex.PrimaryLabel(); for (const auto &[key, val] : vertex.Properties()) { - if (context.shard_request_manager->IsPrimaryKey(vertex_id.first.id, key)) { + if (context.request_router->IsPrimaryKey(vertex_id.first.id, key)) { vertex_id.second.push_back(val); } } @@ -2474,11 +2474,11 @@ class DistributedExpandCursor : public Cursor { request.src_vertices.push_back(get_dst_vertex(edge, direction)); request.direction = (direction == EdgeAtom::Direction::IN) ? msgs::EdgeDirection::OUT : msgs::EdgeDirection::IN; ExecutionState request_state; - auto result_rows = context.shard_request_manager->Request(request_state, std::move(request)); + auto result_rows = context.request_router->Request(request_state, std::move(request)); MG_ASSERT(result_rows.size() == 1); auto &result_row = result_rows.front(); frame[self_.common_.node_symbol] = accessors::VertexAccessor( - msgs::Vertex{result_row.src_vertex}, result_row.src_vertex_properties, context.shard_request_manager); + msgs::Vertex{result_row.src_vertex}, result_row.src_vertex_properties, context.request_router); } bool InitEdges(Frame &frame, ExecutionContext &context) { @@ -2502,7 +2502,7 @@ class DistributedExpandCursor : public Cursor { ExecutionState request_state; auto result_rows = std::invoke([&context, &request_state, &request]() mutable { SCOPED_REQUEST_WAIT_PROFILE; - return context.shard_request_manager->Request(request_state, std::move(request)); + return context.request_router->Request(request_state, std::move(request)); }); MG_ASSERT(result_rows.size() == 1); auto &result_row = result_rows.front(); @@ -2525,14 +2525,14 @@ class DistributedExpandCursor : public Cursor { case EdgeAtom::Direction::IN: { for (auto &edge : edge_messages) { edge_accessors.emplace_back(msgs::Edge{std::move(edge.other_end), vertex.Id(), {}, {edge.gid}, edge.type}, - context.shard_request_manager); + context.request_router); } break; } case EdgeAtom::Direction::OUT: { for (auto &edge : edge_messages) { edge_accessors.emplace_back(msgs::Edge{vertex.Id(), std::move(edge.other_end), {}, {edge.gid}, edge.type}, - context.shard_request_manager); + context.request_router); } break; } diff --git a/src/query/v2/plan/pretty_print.cpp b/src/query/v2/plan/pretty_print.cpp index b7ab6da6e..0e8d09e2b 100644 --- a/src/query/v2/plan/pretty_print.cpp +++ b/src/query/v2/plan/pretty_print.cpp @@ -14,12 +14,12 @@ #include "query/v2/bindings/pretty_print.hpp" #include "query/v2/db_accessor.hpp" -#include "query/v2/shard_request_manager.hpp" +#include "query/v2/request_router.hpp" #include "utils/string.hpp" namespace memgraph::query::v2::plan { -PlanPrinter::PlanPrinter(const ShardRequestManagerInterface *request_manager, std::ostream *out) +PlanPrinter::PlanPrinter(const RequestRouterInterface *request_manager, std::ostream *out) : request_manager_(request_manager), out_(out) {} #define PRE_VISIT(TOp) \ @@ -263,14 +263,13 @@ void PlanPrinter::Branch(query::v2::plan::LogicalOperator &op, const std::string --depth_; } -void PrettyPrint(const ShardRequestManagerInterface &request_manager, const LogicalOperator *plan_root, - std::ostream *out) { +void PrettyPrint(const RequestRouterInterface &request_manager, const LogicalOperator *plan_root, std::ostream *out) { PlanPrinter printer(&request_manager, out); // FIXME(mtomic): We should make visitors that take const arguments. const_cast(plan_root)->Accept(printer); } -nlohmann::json PlanToJson(const ShardRequestManagerInterface &request_manager, const LogicalOperator *plan_root) { +nlohmann::json PlanToJson(const RequestRouterInterface &request_manager, const LogicalOperator *plan_root) { impl::PlanToJsonVisitor visitor(&request_manager); // FIXME(mtomic): We should make visitors that take const arguments. const_cast(plan_root)->Accept(visitor); @@ -349,15 +348,15 @@ json ToJson(const utils::Bound &bound) { json ToJson(const Symbol &symbol) { return symbol.name(); } -json ToJson(storage::v3::EdgeTypeId edge_type, const ShardRequestManagerInterface &request_manager) { +json ToJson(storage::v3::EdgeTypeId edge_type, const RequestRouterInterface &request_manager) { return request_manager.EdgeTypeToName(edge_type); } -json ToJson(storage::v3::LabelId label, const ShardRequestManagerInterface &request_manager) { +json ToJson(storage::v3::LabelId label, const RequestRouterInterface &request_manager) { return request_manager.LabelToName(label); } -json ToJson(storage::v3::PropertyId property, const ShardRequestManagerInterface &request_manager) { +json ToJson(storage::v3::PropertyId property, const RequestRouterInterface &request_manager) { return request_manager.PropertyToName(property); } @@ -369,7 +368,7 @@ json ToJson(NamedExpression *nexpr) { } json ToJson(const std::vector> &properties, - const ShardRequestManagerInterface &request_manager) { + const RequestRouterInterface &request_manager) { json json; for (const auto &prop_pair : properties) { json.emplace(ToJson(prop_pair.first, request_manager), ToJson(prop_pair.second)); @@ -377,7 +376,7 @@ json ToJson(const std::vector> return json; } -json ToJson(const NodeCreationInfo &node_info, const ShardRequestManagerInterface &request_manager) { +json ToJson(const NodeCreationInfo &node_info, const RequestRouterInterface &request_manager) { json self; self["symbol"] = ToJson(node_info.symbol); self["labels"] = ToJson(node_info.labels, request_manager); @@ -386,7 +385,7 @@ json ToJson(const NodeCreationInfo &node_info, const ShardRequestManagerInterfac return self; } -json ToJson(const EdgeCreationInfo &edge_info, const ShardRequestManagerInterface &request_manager) { +json ToJson(const EdgeCreationInfo &edge_info, const RequestRouterInterface &request_manager) { json self; self["symbol"] = ToJson(edge_info.symbol); const auto *props = std::get_if(&edge_info.properties); diff --git a/src/query/v2/plan/pretty_print.hpp b/src/query/v2/plan/pretty_print.hpp index 8485723a3..7e97de3ff 100644 --- a/src/query/v2/plan/pretty_print.hpp +++ b/src/query/v2/plan/pretty_print.hpp @@ -18,7 +18,7 @@ #include "query/v2/frontend/ast/ast.hpp" #include "query/v2/plan/operator.hpp" -#include "query/v2/shard_request_manager.hpp" +#include "query/v2/request_router.hpp" namespace memgraph::query::v2 { @@ -27,20 +27,19 @@ namespace plan { class LogicalOperator; /// Pretty print a `LogicalOperator` plan to a `std::ostream`. -/// ShardRequestManager is needed for resolving label and property names. +/// RequestRouter is needed for resolving label and property names. /// Note that `plan_root` isn't modified, but we can't take it as a const /// because we don't have support for visiting a const LogicalOperator. -void PrettyPrint(const ShardRequestManagerInterface &request_manager, const LogicalOperator *plan_root, - std::ostream *out); +void PrettyPrint(const RequestRouterInterface &request_manager, const LogicalOperator *plan_root, std::ostream *out); /// Overload of `PrettyPrint` which defaults the `std::ostream` to `std::cout`. -inline void PrettyPrint(const ShardRequestManagerInterface &request_manager, const LogicalOperator *plan_root) { +inline void PrettyPrint(const RequestRouterInterface &request_manager, const LogicalOperator *plan_root) { PrettyPrint(request_manager, plan_root, &std::cout); } /// Convert a `LogicalOperator` plan to a JSON representation. /// DbAccessor is needed for resolving label and property names. -nlohmann::json PlanToJson(const ShardRequestManagerInterface &request_manager, const LogicalOperator *plan_root); +nlohmann::json PlanToJson(const RequestRouterInterface &request_manager, const LogicalOperator *plan_root); class PlanPrinter : public virtual HierarchicalLogicalOperatorVisitor { public: @@ -48,7 +47,7 @@ class PlanPrinter : public virtual HierarchicalLogicalOperatorVisitor { using HierarchicalLogicalOperatorVisitor::PreVisit; using HierarchicalLogicalOperatorVisitor::Visit; - PlanPrinter(const ShardRequestManagerInterface *request_manager, std::ostream *out); + PlanPrinter(const RequestRouterInterface *request_manager, std::ostream *out); bool DefaultPreVisit() override; @@ -115,7 +114,7 @@ class PlanPrinter : public virtual HierarchicalLogicalOperatorVisitor { void Branch(LogicalOperator &op, const std::string &branch_name = ""); int64_t depth_{0}; - const ShardRequestManagerInterface *request_manager_{nullptr}; + const RequestRouterInterface *request_manager_{nullptr}; std::ostream *out_{nullptr}; }; @@ -133,20 +132,20 @@ nlohmann::json ToJson(const utils::Bound &bound); nlohmann::json ToJson(const Symbol &symbol); -nlohmann::json ToJson(storage::v3::EdgeTypeId edge_type, const ShardRequestManagerInterface &request_manager); +nlohmann::json ToJson(storage::v3::EdgeTypeId edge_type, const RequestRouterInterface &request_manager); -nlohmann::json ToJson(storage::v3::LabelId label, const ShardRequestManagerInterface &request_manager); +nlohmann::json ToJson(storage::v3::LabelId label, const RequestRouterInterface &request_manager); -nlohmann::json ToJson(storage::v3::PropertyId property, const ShardRequestManagerInterface &request_manager); +nlohmann::json ToJson(storage::v3::PropertyId property, const RequestRouterInterface &request_manager); nlohmann::json ToJson(NamedExpression *nexpr); nlohmann::json ToJson(const std::vector> &properties, - const ShardRequestManagerInterface &request_manager); + const RequestRouterInterface &request_manager); -nlohmann::json ToJson(const NodeCreationInfo &node_info, const ShardRequestManagerInterface &request_manager); +nlohmann::json ToJson(const NodeCreationInfo &node_info, const RequestRouterInterface &request_manager); -nlohmann::json ToJson(const EdgeCreationInfo &edge_info, const ShardRequestManagerInterface &request_manager); +nlohmann::json ToJson(const EdgeCreationInfo &edge_info, const RequestRouterInterface &request_manager); nlohmann::json ToJson(const Aggregate::Element &elem); @@ -161,7 +160,7 @@ nlohmann::json ToJson(const std::vector &items, Args &&...args) { class PlanToJsonVisitor : public virtual HierarchicalLogicalOperatorVisitor { public: - explicit PlanToJsonVisitor(const ShardRequestManagerInterface *request_manager) : request_manager_(request_manager) {} + explicit PlanToJsonVisitor(const RequestRouterInterface *request_manager) : request_manager_(request_manager) {} using HierarchicalLogicalOperatorVisitor::PostVisit; using HierarchicalLogicalOperatorVisitor::PreVisit; @@ -217,7 +216,7 @@ class PlanToJsonVisitor : public virtual HierarchicalLogicalOperatorVisitor { protected: nlohmann::json output_; - const ShardRequestManagerInterface *request_manager_; + const RequestRouterInterface *request_manager_; nlohmann::json PopOutput() { nlohmann::json tmp; diff --git a/src/query/v2/plan/rule_based_planner.hpp b/src/query/v2/plan/rule_based_planner.hpp index 884693c2f..2b6857aea 100644 --- a/src/query/v2/plan/rule_based_planner.hpp +++ b/src/query/v2/plan/rule_based_planner.hpp @@ -272,7 +272,7 @@ class RuleBasedPlanner { PropertiesMapList vector_props; vector_props.reserve(node_properties->size()); for (const auto &kv : *node_properties) { - // TODO(kostasrim) GetProperty should be implemented in terms of ShardRequestManager NameToProperty + // TODO(kostasrim) GetProperty should be implemented in terms of RequestRouter NameToProperty vector_props.push_back({GetProperty(kv.first), kv.second}); } return std::move(vector_props); diff --git a/src/query/v2/plan/vertex_count_cache.hpp b/src/query/v2/plan/vertex_count_cache.hpp index 47a10ba3e..e68ce1220 100644 --- a/src/query/v2/plan/vertex_count_cache.hpp +++ b/src/query/v2/plan/vertex_count_cache.hpp @@ -15,7 +15,7 @@ #include #include "query/v2/bindings/typed_value.hpp" -#include "query/v2/shard_request_manager.hpp" +#include "query/v2/request_router.hpp" #include "storage/v3/conversions.hpp" #include "storage/v3/id_types.hpp" #include "storage/v3/property_value.hpp" @@ -29,11 +29,11 @@ namespace memgraph::query::v2::plan { template class VertexCountCache { public: - explicit VertexCountCache(TDbAccessor *shard_request_manager) : shard_request_manager_{shard_request_manager} {} + explicit VertexCountCache(TDbAccessor *request_router) : request_router_{request_router} {} - auto NameToLabel(const std::string &name) { return shard_request_manager_->NameToLabel(name); } - auto NameToProperty(const std::string &name) { return shard_request_manager_->NameToProperty(name); } - auto NameToEdgeType(const std::string &name) { return shard_request_manager_->NameToEdgeType(name); } + auto NameToLabel(const std::string &name) { return request_router_->NameToLabel(name); } + auto NameToProperty(const std::string &name) { return request_router_->NameToProperty(name); } + auto NameToEdgeType(const std::string &name) { return request_router_->NameToEdgeType(name); } int64_t VerticesCount() { return 1; } @@ -53,11 +53,11 @@ class VertexCountCache { } // For now return true if label is primary label - bool LabelIndexExists(storage::v3::LabelId label) { return shard_request_manager_->IsPrimaryLabel(label); } + bool LabelIndexExists(storage::v3::LabelId label) { return request_router_->IsPrimaryLabel(label); } bool LabelPropertyIndexExists(storage::v3::LabelId /*label*/, storage::v3::PropertyId /*property*/) { return false; } - ShardRequestManagerInterface *shard_request_manager_; + RequestRouterInterface *request_router_; }; template diff --git a/src/query/v2/shard_request_manager.hpp b/src/query/v2/request_router.hpp similarity index 96% rename from src/query/v2/shard_request_manager.hpp rename to src/query/v2/request_router.hpp index 68a10f384..86aae6bcc 100644 --- a/src/query/v2/shard_request_manager.hpp +++ b/src/query/v2/request_router.hpp @@ -83,10 +83,10 @@ struct ExecutionState { // CompoundKey is optional because some operators require to iterate over all the available keys // of a shard. One example is ScanAll, where we only require the field label. std::optional key; - // Transaction id to be filled by the ShardRequestManager implementation + // Transaction id to be filled by the RequestRouter implementation coordinator::Hlc transaction_id; - // Initialized by ShardRequestManager implementation. This vector is filled with the shards that - // the ShardRequestManager impl will send requests to. When a request to a shard exhausts it, meaning that + // Initialized by RequestRouter implementation. This vector is filled with the shards that + // the RequestRouter impl will send requests to. When a request to a shard exhausts it, meaning that // it pulled all the requested data from the given Shard, it will be removed from the Vector. When the Vector becomes // empty, it means that all of the requests have completed succefully. // TODO(gvolfing) @@ -101,16 +101,16 @@ struct ExecutionState { State state = INITIALIZING; }; -class ShardRequestManagerInterface { +class RequestRouterInterface { public: using VertexAccessor = query::v2::accessors::VertexAccessor; - ShardRequestManagerInterface() = default; - ShardRequestManagerInterface(const ShardRequestManagerInterface &) = delete; - ShardRequestManagerInterface(ShardRequestManagerInterface &&) = delete; - ShardRequestManagerInterface &operator=(const ShardRequestManagerInterface &) = delete; - ShardRequestManagerInterface &&operator=(ShardRequestManagerInterface &&) = delete; + RequestRouterInterface() = default; + RequestRouterInterface(const RequestRouterInterface &) = delete; + RequestRouterInterface(RequestRouterInterface &&) = delete; + RequestRouterInterface &operator=(const RequestRouterInterface &) = delete; + RequestRouterInterface &&operator=(RequestRouterInterface &&) = delete; - virtual ~ShardRequestManagerInterface() = default; + virtual ~RequestRouterInterface() = default; virtual void StartTransaction() = 0; virtual void Commit() = 0; @@ -134,7 +134,7 @@ class ShardRequestManagerInterface { // TODO(kostasrim)rename this class template template -class ShardRequestManager : public ShardRequestManagerInterface { +class RequestRouter : public RequestRouterInterface { public: using StorageClient = coordinator::RsmClient; @@ -145,15 +145,14 @@ class ShardRequestManager : public ShardRequestManagerInterface { using ShardMap = coordinator::ShardMap; using CompoundKey = coordinator::PrimaryKey; using VertexAccessor = query::v2::accessors::VertexAccessor; - ShardRequestManager(CoordinatorClient coord, io::Io &&io) - : coord_cli_(std::move(coord)), io_(std::move(io)) {} + RequestRouter(CoordinatorClient coord, io::Io &&io) : coord_cli_(std::move(coord)), io_(std::move(io)) {} - ShardRequestManager(const ShardRequestManager &) = delete; - ShardRequestManager(ShardRequestManager &&) = delete; - ShardRequestManager &operator=(const ShardRequestManager &) = delete; - ShardRequestManager &operator=(ShardRequestManager &&) = delete; + RequestRouter(const RequestRouter &) = delete; + RequestRouter(RequestRouter &&) = delete; + RequestRouter &operator=(const RequestRouter &) = delete; + RequestRouter &operator=(RequestRouter &&) = delete; - ~ShardRequestManager() override {} + ~RequestRouter() override {} void StartTransaction() override { coordinator::HlcRequest req{.last_shard_map_version = shards_map_.GetHlc()}; diff --git a/tests/simulation/shard_request_manager.cpp b/tests/simulation/request_router.cpp similarity index 94% rename from tests/simulation/shard_request_manager.cpp rename to tests/simulation/request_router.cpp index 9db18659e..ddd9351ae 100644 --- a/tests/simulation/shard_request_manager.cpp +++ b/tests/simulation/request_router.cpp @@ -31,8 +31,8 @@ #include "io/simulator/simulator_transport.hpp" #include "query/v2/accessors.hpp" #include "query/v2/conversions.hpp" +#include "query/v2/request_router.hpp" #include "query/v2/requests.hpp" -#include "query/v2/shard_request_manager.hpp" #include "storage/v3/property_value.hpp" #include "utils/result.hpp" @@ -151,7 +151,7 @@ void RunStorageRaft(Raft state{.label = "test_label"}; auto result = io.Request(state); @@ -171,7 +171,7 @@ void TestScanVertices(query::v2::ShardRequestManagerInterface &io) { } } -void TestCreateVertices(query::v2::ShardRequestManagerInterface &io) { +void TestCreateVertices(query::v2::RequestRouterInterface &io) { using PropVal = msgs::Value; msgs::ExecutionState state; std::vector new_vertices; @@ -187,7 +187,7 @@ void TestCreateVertices(query::v2::ShardRequestManagerInterface &io) { MG_ASSERT(result.size() == 2); } -void TestCreateExpand(query::v2::ShardRequestManagerInterface &io) { +void TestCreateExpand(query::v2::RequestRouterInterface &io) { using PropVal = msgs::Value; msgs::ExecutionState state; std::vector new_expands; @@ -209,20 +209,20 @@ void TestCreateExpand(query::v2::ShardRequestManagerInterface &io) { MG_ASSERT(responses[1].success); } -void TestExpandOne(query::v2::ShardRequestManagerInterface &shard_request_manager) { +void TestExpandOne(query::v2::RequestRouterInterface &request_router) { msgs::ExecutionState state{}; msgs::ExpandOneRequest request; - const auto edge_type_id = shard_request_manager.NameToEdgeType("edge_type"); - const auto label = msgs::Label{shard_request_manager.NameToLabel("test_label")}; + const auto edge_type_id = request_router.NameToEdgeType("edge_type"); + const auto label = msgs::Label{request_router.NameToLabel("test_label")}; request.src_vertices.push_back(msgs::VertexId{label, {msgs::Value(int64_t(0)), msgs::Value(int64_t(0))}}); request.edge_types.push_back(msgs::EdgeType{edge_type_id}); request.direction = msgs::EdgeDirection::BOTH; - auto result_rows = shard_request_manager.Request(state, std::move(request)); + auto result_rows = request_router.Request(state, std::move(request)); MG_ASSERT(result_rows.size() == 2); } -template -void TestAggregate(ShardRequestManager &io) {} +template +void TestAggregate(RequestRouter &io) {} void DoTest() { SimulatorConfig config{ @@ -337,7 +337,7 @@ void DoTest() { // also get the current shard map CoordinatorClient coordinator_client(cli_io, c_addrs[0], c_addrs); - query::v2::ShardRequestManager io(std::move(coordinator_client), std::move(cli_io)); + query::v2::RequestRouter io(std::move(coordinator_client), std::move(cli_io)); io.StartTransaction(); TestScanVertices(io); diff --git a/tests/simulation/test_cluster.hpp b/tests/simulation/test_cluster.hpp index fd62b99f9..99f617cda 100644 --- a/tests/simulation/test_cluster.hpp +++ b/tests/simulation/test_cluster.hpp @@ -30,8 +30,8 @@ #include "io/simulator/simulator_transport.hpp" #include "machine_manager/machine_config.hpp" #include "machine_manager/machine_manager.hpp" +#include "query/v2/request_router.hpp" #include "query/v2/requests.hpp" -#include "query/v2/shard_request_manager.hpp" #include "testing_constants.hpp" #include "utils/print_helpers.hpp" #include "utils/variant_helpers.hpp" @@ -151,8 +151,8 @@ ShardMap TestShardMap(int n_splits, int replication_factor) { return sm; } -void ExecuteOp(query::v2::ShardRequestManager &shard_request_manager, - std::set &correctness_model, CreateVertex create_vertex) { +void ExecuteOp(query::v2::RequestRouter &request_router, std::set &correctness_model, + CreateVertex create_vertex) { const auto key1 = memgraph::storage::v3::PropertyValue(create_vertex.first); const auto key2 = memgraph::storage::v3::PropertyValue(create_vertex.second); @@ -166,7 +166,7 @@ void ExecuteOp(query::v2::ShardRequestManager &shard_request query::v2::ExecutionState state; - auto label_id = shard_request_manager.NameToLabel("test_label"); + auto label_id = request_router.NameToLabel("test_label"); msgs::NewVertex nv{.primary_key = primary_key}; nv.label_ids.push_back({label_id}); @@ -174,7 +174,7 @@ void ExecuteOp(query::v2::ShardRequestManager &shard_request std::vector new_vertices; new_vertices.push_back(std::move(nv)); - auto result = shard_request_manager.Request(state, std::move(new_vertices)); + auto result = request_router.Request(state, std::move(new_vertices)); RC_ASSERT(result.size() == 1); RC_ASSERT(!result[0].error.has_value()); @@ -182,11 +182,11 @@ void ExecuteOp(query::v2::ShardRequestManager &shard_request correctness_model.emplace(std::make_pair(create_vertex.first, create_vertex.second)); } -void ExecuteOp(query::v2::ShardRequestManager &shard_request_manager, - std::set &correctness_model, ScanAll scan_all) { +void ExecuteOp(query::v2::RequestRouter &request_router, std::set &correctness_model, + ScanAll scan_all) { query::v2::ExecutionState request{.label = "test_label"}; - auto results = shard_request_manager.Request(request); + auto results = request_router.Request(request); RC_ASSERT(results.size() == correctness_model.size()); @@ -247,15 +247,14 @@ std::pair RunClusterSimulation(const CoordinatorClient coordinator_client(cli_io, coordinator_address, {coordinator_address}); WaitForShardsToInitialize(coordinator_client); - query::v2::ShardRequestManager shard_request_manager(std::move(coordinator_client), - std::move(cli_io)); + query::v2::RequestRouter request_router(std::move(coordinator_client), std::move(cli_io)); - shard_request_manager.StartTransaction(); + request_router.StartTransaction(); auto correctness_model = std::set{}; for (const Op &op : ops) { - std::visit([&](auto &o) { ExecuteOp(shard_request_manager, correctness_model, o); }, op.inner); + std::visit([&](auto &o) { ExecuteOp(request_router, correctness_model, o); }, op.inner); } // We have now completed our workload without failing any assertions, so we can diff --git a/tests/unit/high_density_shard_create_scan.cpp b/tests/unit/high_density_shard_create_scan.cpp index e586a3556..22af9c702 100644 --- a/tests/unit/high_density_shard_create_scan.cpp +++ b/tests/unit/high_density_shard_create_scan.cpp @@ -29,8 +29,8 @@ #include "io/simulator/simulator_transport.hpp" #include "machine_manager/machine_config.hpp" #include "machine_manager/machine_manager.hpp" +#include "query/v2/request_router.hpp" #include "query/v2/requests.hpp" -#include "query/v2/shard_request_manager.hpp" #include "utils/variant_helpers.hpp" namespace memgraph::tests::simulation { @@ -161,8 +161,8 @@ ShardMap TestShardMap(int shards, int replication_factor, int gap_between_shards return sm; } -void ExecuteOp(query::v2::ShardRequestManager &shard_request_manager, - std::set &correctness_model, CreateVertex create_vertex) { +void ExecuteOp(query::v2::RequestRouter &request_router, std::set &correctness_model, + CreateVertex create_vertex) { const auto key1 = memgraph::storage::v3::PropertyValue(create_vertex.first); const auto key2 = memgraph::storage::v3::PropertyValue(create_vertex.second); @@ -176,7 +176,7 @@ void ExecuteOp(query::v2::ShardRequestManager &shard_request_man query::v2::ExecutionState state; - auto label_id = shard_request_manager.NameToLabel("test_label"); + auto label_id = request_router.NameToLabel("test_label"); msgs::NewVertex nv{.primary_key = primary_key}; nv.label_ids.push_back({label_id}); @@ -184,7 +184,7 @@ void ExecuteOp(query::v2::ShardRequestManager &shard_request_man std::vector new_vertices; new_vertices.push_back(std::move(nv)); - auto result = shard_request_manager.Request(state, std::move(new_vertices)); + auto result = request_router.Request(state, std::move(new_vertices)); MG_ASSERT(result.size() == 1); MG_ASSERT(!result[0].error.has_value()); @@ -192,11 +192,11 @@ void ExecuteOp(query::v2::ShardRequestManager &shard_request_man correctness_model.emplace(std::make_pair(create_vertex.first, create_vertex.second)); } -void ExecuteOp(query::v2::ShardRequestManager &shard_request_manager, - std::set &correctness_model, ScanAll scan_all) { +void ExecuteOp(query::v2::RequestRouter &request_router, std::set &correctness_model, + ScanAll scan_all) { query::v2::ExecutionState request{.label = "test_label"}; - auto results = shard_request_manager.Request(request); + auto results = request_router.Request(request); MG_ASSERT(results.size() == correctness_model.size()); @@ -245,23 +245,22 @@ void RunWorkload(int shards, int replication_factor, int create_ops, int scan_op WaitForShardsToInitialize(coordinator_client); auto time_after_shard_stabilization = cli_io_2.Now(); - query::v2::ShardRequestManager shard_request_manager(std::move(coordinator_client), - std::move(cli_io)); + query::v2::RequestRouter request_router(std::move(coordinator_client), std::move(cli_io)); - shard_request_manager.StartTransaction(); + request_router.StartTransaction(); auto correctness_model = std::set{}; auto time_before_creates = cli_io_2.Now(); for (int i = 0; i < create_ops; i++) { - ExecuteOp(shard_request_manager, correctness_model, CreateVertex{.first = i, .second = i}); + ExecuteOp(request_router, correctness_model, CreateVertex{.first = i, .second = i}); } auto time_after_creates = cli_io_2.Now(); for (int i = 0; i < scan_ops; i++) { - ExecuteOp(shard_request_manager, correctness_model, ScanAll{}); + ExecuteOp(request_router, correctness_model, ScanAll{}); } auto time_after_scan = cli_io_2.Now(); diff --git a/tests/unit/machine_manager.cpp b/tests/unit/machine_manager.cpp index 834523fee..0b081e5a1 100644 --- a/tests/unit/machine_manager.cpp +++ b/tests/unit/machine_manager.cpp @@ -27,7 +27,7 @@ #include #include #include "io/rsm/rsm_client.hpp" -#include "query/v2/shard_request_manager.hpp" +#include "query/v2/request_router.hpp" #include "storage/v3/id_types.hpp" #include "storage/v3/schemas.hpp" @@ -109,19 +109,19 @@ ShardMap TestShardMap() { return sm; } -template -void TestScanAll(ShardRequestManager &shard_request_manager) { +template +void TestScanAll(RequestRouter &request_router) { query::v2::ExecutionState state{.label = kLabelName}; - auto result = shard_request_manager.Request(state); + auto result = request_router.Request(state); EXPECT_EQ(result.size(), 2); } -void TestCreateVertices(query::v2::ShardRequestManagerInterface &shard_request_manager) { +void TestCreateVertices(query::v2::RequestRouterInterface &request_router) { using PropVal = msgs::Value; query::v2::ExecutionState state; std::vector new_vertices; - auto label_id = shard_request_manager.NameToLabel(kLabelName); + auto label_id = request_router.NameToLabel(kLabelName); msgs::NewVertex a1{.primary_key = {PropVal(int64_t(0)), PropVal(int64_t(0))}}; a1.label_ids.push_back({label_id}); msgs::NewVertex a2{.primary_key = {PropVal(int64_t(13)), PropVal(int64_t(13))}}; @@ -129,18 +129,18 @@ void TestCreateVertices(query::v2::ShardRequestManagerInterface &shard_request_m new_vertices.push_back(std::move(a1)); new_vertices.push_back(std::move(a2)); - auto result = shard_request_manager.Request(state, std::move(new_vertices)); + auto result = request_router.Request(state, std::move(new_vertices)); EXPECT_EQ(result.size(), 1); EXPECT_FALSE(result[0].error.has_value()) << result[0].error->message; } -void TestCreateExpand(query::v2::ShardRequestManagerInterface &shard_request_manager) { +void TestCreateExpand(query::v2::RequestRouterInterface &request_router) { using PropVal = msgs::Value; query::v2::ExecutionState state; std::vector new_expands; - const auto edge_type_id = shard_request_manager.NameToEdgeType("edge_type"); - const auto label = msgs::Label{shard_request_manager.NameToLabel("test_label")}; + const auto edge_type_id = request_router.NameToEdgeType("edge_type"); + const auto label = msgs::Label{request_router.NameToLabel("test_label")}; const msgs::VertexId vertex_id_1{label, {PropVal(int64_t(0)), PropVal(int64_t(0))}}; const msgs::VertexId vertex_id_2{label, {PropVal(int64_t(13)), PropVal(int64_t(13))}}; msgs::NewExpand expand_1{ @@ -150,27 +150,27 @@ void TestCreateExpand(query::v2::ShardRequestManagerInterface &shard_request_man new_expands.push_back(std::move(expand_1)); new_expands.push_back(std::move(expand_2)); - auto responses = shard_request_manager.Request(state, std::move(new_expands)); + auto responses = request_router.Request(state, std::move(new_expands)); MG_ASSERT(responses.size() == 1); MG_ASSERT(!responses[0].error.has_value()); } -void TestExpandOne(query::v2::ShardRequestManagerInterface &shard_request_manager) { +void TestExpandOne(query::v2::RequestRouterInterface &request_router) { query::v2::ExecutionState state{}; msgs::ExpandOneRequest request; - const auto edge_type_id = shard_request_manager.NameToEdgeType("edge_type"); - const auto label = msgs::Label{shard_request_manager.NameToLabel("test_label")}; + const auto edge_type_id = request_router.NameToEdgeType("edge_type"); + const auto label = msgs::Label{request_router.NameToLabel("test_label")}; request.src_vertices.push_back(msgs::VertexId{label, {msgs::Value(int64_t(0)), msgs::Value(int64_t(0))}}); request.edge_types.push_back(msgs::EdgeType{edge_type_id}); request.direction = msgs::EdgeDirection::BOTH; - auto result_rows = shard_request_manager.Request(state, std::move(request)); + auto result_rows = request_router.Request(state, std::move(request)); MG_ASSERT(result_rows.size() == 1); MG_ASSERT(result_rows[0].in_edges_with_all_properties.size() == 1); MG_ASSERT(result_rows[0].out_edges_with_all_properties.size() == 1); } -template -void TestAggregate(ShardRequestManager &shard_request_manager) {} +template +void TestAggregate(RequestRouter &request_router) {} MachineManager MkMm(LocalSystem &local_system, std::vector
coordinator_addresses, Address addr, ShardMap shard_map) { @@ -226,14 +226,13 @@ TEST(MachineManager, BasicFunctionality) { CoordinatorClient coordinator_client(cli_io, coordinator_address, {coordinator_address}); - query::v2::ShardRequestManager shard_request_manager(std::move(coordinator_client), - std::move(cli_io)); + query::v2::RequestRouter request_router(std::move(coordinator_client), std::move(cli_io)); - shard_request_manager.StartTransaction(); - TestCreateVertices(shard_request_manager); - TestScanAll(shard_request_manager); - TestCreateExpand(shard_request_manager); - TestExpandOne(shard_request_manager); + request_router.StartTransaction(); + TestCreateVertices(request_router); + TestScanAll(request_router); + TestCreateExpand(request_router); + TestExpandOne(request_router); local_system.ShutDown(); }; From 9fc7f9dcedb42eb233eefc3ff3ca2aabba42ed59 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Mon, 28 Nov 2022 13:03:07 +0000 Subject: [PATCH 2/3] Standardize RequestRouter variable names as request_router --- src/query/v2/accessors.cpp | 25 ++--- src/query/v2/accessors.hpp | 13 +-- src/query/v2/bindings/eval.hpp | 4 +- src/query/v2/conversions.hpp | 10 +- src/query/v2/cypher_query_interpreter.cpp | 8 +- src/query/v2/cypher_query_interpreter.hpp | 4 +- .../interpret/awesome_memgraph_functions.hpp | 2 +- src/query/v2/interpreter.cpp | 26 +++--- src/query/v2/plan/operator.cpp | 19 ++-- src/query/v2/plan/pretty_print.cpp | 92 +++++++++---------- src/query/v2/plan/pretty_print.hpp | 28 +++--- tests/simulation/request_router.cpp | 32 +++---- 12 files changed, 133 insertions(+), 130 deletions(-) diff --git a/src/query/v2/accessors.cpp b/src/query/v2/accessors.cpp index a28df0fe5..2eedfcf4d 100644 --- a/src/query/v2/accessors.cpp +++ b/src/query/v2/accessors.cpp @@ -15,15 +15,15 @@ #include "storage/v3/id_types.hpp" namespace memgraph::query::v2::accessors { -EdgeAccessor::EdgeAccessor(Edge edge, const RequestRouterInterface *manager) - : edge(std::move(edge)), manager_(manager) {} +EdgeAccessor::EdgeAccessor(Edge edge, const RequestRouterInterface *request_router) + : edge(std::move(edge)), request_router_(request_router) {} EdgeTypeId EdgeAccessor::EdgeType() const { return edge.type.id; } const std::vector> &EdgeAccessor::Properties() const { return edge.properties; } Value EdgeAccessor::GetProperty(const std::string &prop_name) const { - auto prop_id = manager_->NameToProperty(prop_name); + auto prop_id = request_router_->NameToProperty(prop_name); auto it = std::find_if(edge.properties.begin(), edge.properties.end(), [&](auto &pr) { return prop_id == pr.first; }); if (it == edge.properties.end()) { return {}; @@ -36,19 +36,20 @@ const Edge &EdgeAccessor::GetEdge() const { return edge; } bool EdgeAccessor::IsCycle() const { return edge.src == edge.dst; }; VertexAccessor EdgeAccessor::To() const { - return VertexAccessor(Vertex{edge.dst}, std::vector>{}, manager_); + return VertexAccessor(Vertex{edge.dst}, std::vector>{}, request_router_); } VertexAccessor EdgeAccessor::From() const { - return VertexAccessor(Vertex{edge.src}, std::vector>{}, manager_); + return VertexAccessor(Vertex{edge.src}, std::vector>{}, request_router_); } VertexAccessor::VertexAccessor(Vertex v, std::vector> props, - const RequestRouterInterface *manager) - : vertex(std::move(v)), properties(std::move(props)), manager_(manager) {} + const RequestRouterInterface *request_router) + : vertex(std::move(v)), properties(std::move(props)), request_router_(request_router) {} -VertexAccessor::VertexAccessor(Vertex v, std::map &&props, const RequestRouterInterface *manager) - : vertex(std::move(v)), manager_(manager) { +VertexAccessor::VertexAccessor(Vertex v, std::map &&props, + const RequestRouterInterface *request_router) + : vertex(std::move(v)), request_router_(request_router) { properties.reserve(props.size()); for (auto &[id, value] : props) { properties.emplace_back(std::make_pair(id, std::move(value))); @@ -56,8 +57,8 @@ VertexAccessor::VertexAccessor(Vertex v, std::map &&props, co } VertexAccessor::VertexAccessor(Vertex v, const std::map &props, - const RequestRouterInterface *manager) - : vertex(std::move(v)), manager_(manager) { + const RequestRouterInterface *request_router) + : vertex(std::move(v)), request_router_(request_router) { properties.reserve(props.size()); for (const auto &[id, value] : props) { properties.emplace_back(std::make_pair(id, value)); @@ -87,7 +88,7 @@ Value VertexAccessor::GetProperty(PropertyId prop_id) const { // NOLINTNEXTLINE(readability-convert-member-functions-to-static) Value VertexAccessor::GetProperty(const std::string &prop_name) const { - return GetProperty(manager_->NameToProperty(prop_name)); + return GetProperty(request_router_->NameToProperty(prop_name)); } msgs::Vertex VertexAccessor::GetVertex() const { return vertex; } diff --git a/src/query/v2/accessors.hpp b/src/query/v2/accessors.hpp index b5585953f..db22d29cc 100644 --- a/src/query/v2/accessors.hpp +++ b/src/query/v2/accessors.hpp @@ -41,7 +41,7 @@ class VertexAccessor; class EdgeAccessor final { public: - explicit EdgeAccessor(Edge edge, const RequestRouterInterface *manager); + explicit EdgeAccessor(Edge edge, const RequestRouterInterface *request_router); [[nodiscard]] EdgeTypeId EdgeType() const; @@ -69,7 +69,7 @@ class EdgeAccessor final { private: Edge edge; - const RequestRouterInterface *manager_; + const RequestRouterInterface *request_router_; }; class VertexAccessor final { @@ -77,10 +77,11 @@ class VertexAccessor final { using PropertyId = msgs::PropertyId; using Label = msgs::Label; using VertexId = msgs::VertexId; - VertexAccessor(Vertex v, std::vector> props, const RequestRouterInterface *manager); + VertexAccessor(Vertex v, std::vector> props, + const RequestRouterInterface *request_router); - VertexAccessor(Vertex v, std::map &&props, const RequestRouterInterface *manager); - VertexAccessor(Vertex v, const std::map &props, const RequestRouterInterface *manager); + VertexAccessor(Vertex v, std::map &&props, const RequestRouterInterface *request_router); + VertexAccessor(Vertex v, const std::map &props, const RequestRouterInterface *request_router); [[nodiscard]] Label PrimaryLabel() const; @@ -149,7 +150,7 @@ class VertexAccessor final { private: Vertex vertex; std::vector> properties; - const RequestRouterInterface *manager_; + const RequestRouterInterface *request_router_; }; // inline VertexAccessor EdgeAccessor::To() const { return VertexAccessor(impl_.ToVertex()); } diff --git a/src/query/v2/bindings/eval.hpp b/src/query/v2/bindings/eval.hpp index 912850e87..c7f52f201 100644 --- a/src/query/v2/bindings/eval.hpp +++ b/src/query/v2/bindings/eval.hpp @@ -35,8 +35,8 @@ class Callable { auto operator()(const storage::v3::PropertyValue &val) const { return storage::v3::PropertyToTypedValue(val); }; - auto operator()(const msgs::Value &val, RequestRouterInterface *manager) const { - return ValueToTypedValue(val, manager); + auto operator()(const msgs::Value &val, RequestRouterInterface *request_router) const { + return ValueToTypedValue(val, request_router); }; }; diff --git a/src/query/v2/conversions.hpp b/src/query/v2/conversions.hpp index 161f89979..ac052327d 100644 --- a/src/query/v2/conversions.hpp +++ b/src/query/v2/conversions.hpp @@ -17,7 +17,7 @@ namespace memgraph::query::v2 { -inline TypedValue ValueToTypedValue(const msgs::Value &value, RequestRouterInterface *manager) { +inline TypedValue ValueToTypedValue(const msgs::Value &value, RequestRouterInterface *request_router) { using Value = msgs::Value; switch (value.type) { case Value::Type::Null: @@ -35,7 +35,7 @@ inline TypedValue ValueToTypedValue(const msgs::Value &value, RequestRouterInter std::vector dst; dst.reserve(lst.size()); for (const auto &elem : lst) { - dst.push_back(ValueToTypedValue(elem, manager)); + dst.push_back(ValueToTypedValue(elem, request_router)); } return TypedValue(std::move(dst)); } @@ -43,15 +43,15 @@ inline TypedValue ValueToTypedValue(const msgs::Value &value, RequestRouterInter const auto &value_map = value.map_v; std::map dst; for (const auto &[key, val] : value_map) { - dst[key] = ValueToTypedValue(val, manager); + dst[key] = ValueToTypedValue(val, request_router); } return TypedValue(std::move(dst)); } case Value::Type::Vertex: return TypedValue(accessors::VertexAccessor( - value.vertex_v, std::vector>{}, manager)); + value.vertex_v, std::vector>{}, request_router)); case Value::Type::Edge: - return TypedValue(accessors::EdgeAccessor(value.edge_v, manager)); + return TypedValue(accessors::EdgeAccessor(value.edge_v, request_router)); } throw std::runtime_error("Incorrect type in conversion"); } diff --git a/src/query/v2/cypher_query_interpreter.cpp b/src/query/v2/cypher_query_interpreter.cpp index 880165ad8..f3f8e48d7 100644 --- a/src/query/v2/cypher_query_interpreter.cpp +++ b/src/query/v2/cypher_query_interpreter.cpp @@ -118,9 +118,9 @@ ParsedQuery ParseQuery(const std::string &query_string, const std::map MakeLogicalPlan(AstStorage ast_storage, CypherQuery *query, const Parameters ¶meters, - RequestRouterInterface *shard_manager, + RequestRouterInterface *request_router, const std::vector &predefined_identifiers) { - auto vertex_counts = plan::MakeVertexCountCache(shard_manager); + auto vertex_counts = plan::MakeVertexCountCache(request_router); auto symbol_table = expr::MakeSymbolTable(query, predefined_identifiers); auto planning_context = plan::MakePlanningContext(&ast_storage, &symbol_table, query, &vertex_counts); auto [root, cost] = plan::MakeLogicalPlan(&planning_context, parameters, FLAGS_query_cost_planner); @@ -130,7 +130,7 @@ std::unique_ptr MakeLogicalPlan(AstStorage ast_storage, CypherQuery std::shared_ptr CypherQueryToPlan(uint64_t hash, AstStorage ast_storage, CypherQuery *query, const Parameters ¶meters, utils::SkipList *plan_cache, - RequestRouterInterface *shard_manager, + RequestRouterInterface *request_router, const std::vector &predefined_identifiers) { std::optional::Accessor> plan_cache_access; if (plan_cache) { @@ -146,7 +146,7 @@ std::shared_ptr CypherQueryToPlan(uint64_t hash, AstStorage ast_stor } auto plan = std::make_shared( - MakeLogicalPlan(std::move(ast_storage), query, parameters, shard_manager, predefined_identifiers)); + MakeLogicalPlan(std::move(ast_storage), query, parameters, request_router, predefined_identifiers)); if (plan_cache_access) { plan_cache_access->insert({hash, plan}); } diff --git a/src/query/v2/cypher_query_interpreter.hpp b/src/query/v2/cypher_query_interpreter.hpp index c9d65047c..688e52fed 100644 --- a/src/query/v2/cypher_query_interpreter.hpp +++ b/src/query/v2/cypher_query_interpreter.hpp @@ -132,7 +132,7 @@ class SingleNodeLogicalPlan final : public LogicalPlan { }; std::unique_ptr MakeLogicalPlan(AstStorage ast_storage, CypherQuery *query, const Parameters ¶meters, - RequestRouterInterface *shard_manager, + RequestRouterInterface *request_router, const std::vector &predefined_identifiers); /** @@ -145,7 +145,7 @@ std::unique_ptr MakeLogicalPlan(AstStorage ast_storage, CypherQuery */ std::shared_ptr CypherQueryToPlan(uint64_t hash, AstStorage ast_storage, CypherQuery *query, const Parameters ¶meters, utils::SkipList *plan_cache, - RequestRouterInterface *shard_manager, + RequestRouterInterface *request_router, const std::vector &predefined_identifiers = {}); } // namespace memgraph::query::v2 diff --git a/src/query/v2/interpret/awesome_memgraph_functions.hpp b/src/query/v2/interpret/awesome_memgraph_functions.hpp index 35ed1d797..8ca3eacb3 100644 --- a/src/query/v2/interpret/awesome_memgraph_functions.hpp +++ b/src/query/v2/interpret/awesome_memgraph_functions.hpp @@ -34,7 +34,7 @@ const char kId[] = "ID"; struct FunctionContext { // TODO(kostasrim) consider optional here. RequestRouter does not exist on the storage. // DbAccessor *db_accessor; - RequestRouterInterface *manager; + RequestRouterInterface *request_router; utils::MemoryResource *memory; int64_t timestamp; std::unordered_map *counters; diff --git a/src/query/v2/interpreter.cpp b/src/query/v2/interpreter.cpp index 94ff7a8ff..1c2d6dadf 100644 --- a/src/query/v2/interpreter.cpp +++ b/src/query/v2/interpreter.cpp @@ -143,7 +143,7 @@ class ReplQueryHandler final : public query::v2::ReplicationQueryHandler { /// @throw QueryRuntimeException if an error ocurred. Callback HandleAuthQuery(AuthQuery *auth_query, AuthQueryHandler *auth, const Parameters ¶meters, - RequestRouterInterface *manager) { + RequestRouterInterface *request_router) { // Empty frame for evaluation of password expression. This is OK since // password should be either null or string literal and it's evaluation // should not depend on frame. @@ -154,7 +154,7 @@ Callback HandleAuthQuery(AuthQuery *auth_query, AuthQueryHandler *auth, const Pa // the argument to Callback. evaluation_context.timestamp = QueryTimestamp(); evaluation_context.parameters = parameters; - ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, manager, storage::v3::View::OLD); + ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, request_router, storage::v3::View::OLD); std::string username = auth_query->user_; std::string rolename = auth_query->role_; @@ -312,7 +312,7 @@ Callback HandleAuthQuery(AuthQuery *auth_query, AuthQueryHandler *auth, const Pa } Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters ¶meters, - InterpreterContext *interpreter_context, RequestRouterInterface *manager, + InterpreterContext *interpreter_context, RequestRouterInterface *request_router, std::vector *notifications) { expr::Frame frame(0); SymbolTable symbol_table; @@ -321,7 +321,7 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters & // the argument to Callback. evaluation_context.timestamp = QueryTimestamp(); evaluation_context.parameters = parameters; - ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, manager, storage::v3::View::OLD); + ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, request_router, storage::v3::View::OLD); Callback callback; switch (repl_query->action_) { @@ -448,7 +448,7 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters & } Callback HandleSettingQuery(SettingQuery *setting_query, const Parameters ¶meters, - RequestRouterInterface *manager) { + RequestRouterInterface *request_router) { expr::Frame frame(0); SymbolTable symbol_table; EvaluationContext evaluation_context; @@ -458,7 +458,7 @@ Callback HandleSettingQuery(SettingQuery *setting_query, const Parameters ¶m std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()) .count(); evaluation_context.parameters = parameters; - ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, manager, storage::v3::View::OLD); + ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, request_router, storage::v3::View::OLD); Callback callback; switch (setting_query->action_) { @@ -1182,14 +1182,14 @@ PreparedQuery PrepareIndexQuery(ParsedQuery parsed_query, bool in_explicit_trans PreparedQuery PrepareAuthQuery(ParsedQuery parsed_query, bool in_explicit_transaction, std::map *summary, InterpreterContext *interpreter_context, DbAccessor *dba, utils::MemoryResource *execution_memory, - RequestRouterInterface *manager) { + RequestRouterInterface *request_router) { if (in_explicit_transaction) { throw UserModificationInMulticommandTxException(); } auto *auth_query = utils::Downcast(parsed_query.query); - auto callback = HandleAuthQuery(auth_query, interpreter_context->auth, parsed_query.parameters, manager); + auto callback = HandleAuthQuery(auth_query, interpreter_context->auth, parsed_query.parameters, request_router); SymbolTable symbol_table; std::vector output_symbols; @@ -1218,14 +1218,14 @@ PreparedQuery PrepareAuthQuery(ParsedQuery parsed_query, bool in_explicit_transa PreparedQuery PrepareReplicationQuery(ParsedQuery parsed_query, const bool in_explicit_transaction, std::vector *notifications, InterpreterContext *interpreter_context, - RequestRouterInterface *manager) { + RequestRouterInterface *request_router) { if (in_explicit_transaction) { throw ReplicationModificationInMulticommandTxException(); } auto *replication_query = utils::Downcast(parsed_query.query); - auto callback = - HandleReplicationQuery(replication_query, parsed_query.parameters, interpreter_context, manager, notifications); + auto callback = HandleReplicationQuery(replication_query, parsed_query.parameters, interpreter_context, + request_router, notifications); return PreparedQuery{callback.header, std::move(parsed_query.required_privileges), [callback_fn = std::move(callback.fn), pull_plan = std::shared_ptr{nullptr}]( @@ -1314,14 +1314,14 @@ PreparedQuery PrepareCreateSnapshotQuery(ParsedQuery parsed_query, bool in_expli } PreparedQuery PrepareSettingQuery(ParsedQuery parsed_query, const bool in_explicit_transaction, - RequestRouterInterface *manager) { + RequestRouterInterface *request_router) { if (in_explicit_transaction) { throw SettingConfigInMulticommandTxException{}; } auto *setting_query = utils::Downcast(parsed_query.query); MG_ASSERT(setting_query); - auto callback = HandleSettingQuery(setting_query, parsed_query.parameters, manager); + auto callback = HandleSettingQuery(setting_query, parsed_query.parameters, request_router); return PreparedQuery{std::move(callback.header), std::move(parsed_query.required_privileges), [callback_fn = std::move(callback.fn), pull_plan = std::shared_ptr{nullptr}]( diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp index 2d2849f0d..014be0d62 100644 --- a/src/query/v2/plan/operator.cpp +++ b/src/query/v2/plan/operator.cpp @@ -177,10 +177,10 @@ class DistributedCreateNodeCursor : public Cursor { bool Pull(Frame &frame, ExecutionContext &context) override { SCOPED_PROFILE_OP("CreateNode"); if (input_cursor_->Pull(frame, context)) { - auto &shard_manager = context.request_router; + auto &request_router = context.request_router; { SCOPED_REQUEST_WAIT_PROFILE; - shard_manager->Request(state_, NodeCreationInfoToRequest(context, frame)); + request_router->Request(state_, NodeCreationInfoToRequest(context, frame)); } PlaceNodeOnTheFrame(frame, context); return true; @@ -386,10 +386,10 @@ class DistributedScanAllAndFilterCursor : public Cursor { using VertexAccessor = accessors::VertexAccessor; - bool MakeRequest(RequestRouterInterface &shard_manager, ExecutionContext &context) { + bool MakeRequest(RequestRouterInterface &request_router, ExecutionContext &context) { { SCOPED_REQUEST_WAIT_PROFILE; - current_batch = shard_manager.Request(request_state_); + current_batch = request_router.Request(request_state_); } current_vertex_it = current_batch.begin(); return !current_batch.empty(); @@ -398,7 +398,7 @@ class DistributedScanAllAndFilterCursor : public Cursor { bool Pull(Frame &frame, ExecutionContext &context) override { SCOPED_PROFILE_OP(op_name_); - auto &shard_manager = *context.request_router; + auto &request_router = *context.request_router; while (true) { if (MustAbort(context)) { throw HintedAbortError(); @@ -411,10 +411,11 @@ class DistributedScanAllAndFilterCursor : public Cursor { } } - request_state_.label = label_.has_value() ? std::make_optional(shard_manager.LabelToName(*label_)) : std::nullopt; + request_state_.label = + label_.has_value() ? std::make_optional(request_router.LabelToName(*label_)) : std::nullopt; if (current_vertex_it == current_batch.end() && - (request_state_.state == State::COMPLETED || !MakeRequest(shard_manager, context))) { + (request_state_.state == State::COMPLETED || !MakeRequest(request_router, context))) { ResetExecutionState(); continue; } @@ -2338,11 +2339,11 @@ class DistributedCreateExpandCursor : public Cursor { if (!input_cursor_->Pull(frame, context)) { return false; } - auto &shard_manager = context.request_router; + auto &request_router = context.request_router; ResetExecutionState(); { SCOPED_REQUEST_WAIT_PROFILE; - shard_manager->Request(state_, ExpandCreationInfoToRequest(context, frame)); + request_router->Request(state_, ExpandCreationInfoToRequest(context, frame)); } return true; } diff --git a/src/query/v2/plan/pretty_print.cpp b/src/query/v2/plan/pretty_print.cpp index 0e8d09e2b..bc30a3890 100644 --- a/src/query/v2/plan/pretty_print.cpp +++ b/src/query/v2/plan/pretty_print.cpp @@ -19,8 +19,8 @@ namespace memgraph::query::v2::plan { -PlanPrinter::PlanPrinter(const RequestRouterInterface *request_manager, std::ostream *out) - : request_manager_(request_manager), out_(out) {} +PlanPrinter::PlanPrinter(const RequestRouterInterface *request_router, std::ostream *out) + : request_router_(request_router), out_(out) {} #define PRE_VISIT(TOp) \ bool PlanPrinter::PreVisit(TOp &) { \ @@ -34,7 +34,7 @@ bool PlanPrinter::PreVisit(CreateExpand &op) { WithPrintLn([&](auto &out) { out << "* CreateExpand (" << op.input_symbol_.name() << ")" << (op.edge_info_.direction == query::v2::EdgeAtom::Direction::IN ? "<-" : "-") << "[" - << op.edge_info_.symbol.name() << ":" << request_manager_->EdgeTypeToName(op.edge_info_.edge_type) << "]" + << op.edge_info_.symbol.name() << ":" << request_router_->EdgeTypeToName(op.edge_info_.edge_type) << "]" << (op.edge_info_.direction == query::v2::EdgeAtom::Direction::OUT ? "->" : "-") << "(" << op.node_info_.symbol.name() << ")"; }); @@ -54,7 +54,7 @@ bool PlanPrinter::PreVisit(query::v2::plan::ScanAll &op) { bool PlanPrinter::PreVisit(query::v2::plan::ScanAllByLabel &op) { WithPrintLn([&](auto &out) { out << "* ScanAllByLabel" - << " (" << op.output_symbol_.name() << " :" << request_manager_->LabelToName(op.label_) << ")"; + << " (" << op.output_symbol_.name() << " :" << request_router_->LabelToName(op.label_) << ")"; }); return true; } @@ -62,8 +62,8 @@ bool PlanPrinter::PreVisit(query::v2::plan::ScanAllByLabel &op) { bool PlanPrinter::PreVisit(query::v2::plan::ScanAllByLabelPropertyValue &op) { WithPrintLn([&](auto &out) { out << "* ScanAllByLabelPropertyValue" - << " (" << op.output_symbol_.name() << " :" << request_manager_->LabelToName(op.label_) << " {" - << request_manager_->PropertyToName(op.property_) << "})"; + << " (" << op.output_symbol_.name() << " :" << request_router_->LabelToName(op.label_) << " {" + << request_router_->PropertyToName(op.property_) << "})"; }); return true; } @@ -71,8 +71,8 @@ bool PlanPrinter::PreVisit(query::v2::plan::ScanAllByLabelPropertyValue &op) { bool PlanPrinter::PreVisit(query::v2::plan::ScanAllByLabelPropertyRange &op) { WithPrintLn([&](auto &out) { out << "* ScanAllByLabelPropertyRange" - << " (" << op.output_symbol_.name() << " :" << request_manager_->LabelToName(op.label_) << " {" - << request_manager_->PropertyToName(op.property_) << "})"; + << " (" << op.output_symbol_.name() << " :" << request_router_->LabelToName(op.label_) << " {" + << request_router_->PropertyToName(op.property_) << "})"; }); return true; } @@ -80,8 +80,8 @@ bool PlanPrinter::PreVisit(query::v2::plan::ScanAllByLabelPropertyRange &op) { bool PlanPrinter::PreVisit(query::v2::plan::ScanAllByLabelProperty &op) { WithPrintLn([&](auto &out) { out << "* ScanAllByLabelProperty" - << " (" << op.output_symbol_.name() << " :" << request_manager_->LabelToName(op.label_) << " {" - << request_manager_->PropertyToName(op.property_) << "})"; + << " (" << op.output_symbol_.name() << " :" << request_router_->LabelToName(op.label_) << " {" + << request_router_->PropertyToName(op.property_) << "})"; }); return true; } @@ -100,7 +100,7 @@ bool PlanPrinter::PreVisit(query::v2::plan::Expand &op) { << (op.common_.direction == query::v2::EdgeAtom::Direction::IN ? "<-" : "-") << "[" << op.common_.edge_symbol.name(); utils::PrintIterable(*out_, op.common_.edge_types, "|", [this](auto &stream, const auto &edge_type) { - stream << ":" << request_manager_->EdgeTypeToName(edge_type); + stream << ":" << request_router_->EdgeTypeToName(edge_type); }); *out_ << "]" << (op.common_.direction == query::v2::EdgeAtom::Direction::OUT ? "->" : "-") << "(" << op.common_.node_symbol.name() << ")"; @@ -129,7 +129,7 @@ bool PlanPrinter::PreVisit(query::v2::plan::ExpandVariable &op) { << (op.common_.direction == query::v2::EdgeAtom::Direction::IN ? "<-" : "-") << "[" << op.common_.edge_symbol.name(); utils::PrintIterable(*out_, op.common_.edge_types, "|", [this](auto &stream, const auto &edge_type) { - stream << ":" << request_manager_->EdgeTypeToName(edge_type); + stream << ":" << request_router_->EdgeTypeToName(edge_type); }); *out_ << "]" << (op.common_.direction == query::v2::EdgeAtom::Direction::OUT ? "->" : "-") << "(" << op.common_.node_symbol.name() << ")"; @@ -263,14 +263,14 @@ void PlanPrinter::Branch(query::v2::plan::LogicalOperator &op, const std::string --depth_; } -void PrettyPrint(const RequestRouterInterface &request_manager, const LogicalOperator *plan_root, std::ostream *out) { - PlanPrinter printer(&request_manager, out); +void PrettyPrint(const RequestRouterInterface &request_router, const LogicalOperator *plan_root, std::ostream *out) { + PlanPrinter printer(&request_router, out); // FIXME(mtomic): We should make visitors that take const arguments. const_cast(plan_root)->Accept(printer); } -nlohmann::json PlanToJson(const RequestRouterInterface &request_manager, const LogicalOperator *plan_root) { - impl::PlanToJsonVisitor visitor(&request_manager); +nlohmann::json PlanToJson(const RequestRouterInterface &request_router, const LogicalOperator *plan_root) { + impl::PlanToJsonVisitor visitor(&request_router); // FIXME(mtomic): We should make visitors that take const arguments. const_cast(plan_root)->Accept(visitor); return visitor.output(); @@ -348,16 +348,16 @@ json ToJson(const utils::Bound &bound) { json ToJson(const Symbol &symbol) { return symbol.name(); } -json ToJson(storage::v3::EdgeTypeId edge_type, const RequestRouterInterface &request_manager) { - return request_manager.EdgeTypeToName(edge_type); +json ToJson(storage::v3::EdgeTypeId edge_type, const RequestRouterInterface &request_router) { + return request_router.EdgeTypeToName(edge_type); } -json ToJson(storage::v3::LabelId label, const RequestRouterInterface &request_manager) { - return request_manager.LabelToName(label); +json ToJson(storage::v3::LabelId label, const RequestRouterInterface &request_router) { + return request_router.LabelToName(label); } -json ToJson(storage::v3::PropertyId property, const RequestRouterInterface &request_manager) { - return request_manager.PropertyToName(property); +json ToJson(storage::v3::PropertyId property, const RequestRouterInterface &request_router) { + return request_router.PropertyToName(property); } json ToJson(NamedExpression *nexpr) { @@ -368,29 +368,29 @@ json ToJson(NamedExpression *nexpr) { } json ToJson(const std::vector> &properties, - const RequestRouterInterface &request_manager) { + const RequestRouterInterface &request_router) { json json; for (const auto &prop_pair : properties) { - json.emplace(ToJson(prop_pair.first, request_manager), ToJson(prop_pair.second)); + json.emplace(ToJson(prop_pair.first, request_router), ToJson(prop_pair.second)); } return json; } -json ToJson(const NodeCreationInfo &node_info, const RequestRouterInterface &request_manager) { +json ToJson(const NodeCreationInfo &node_info, const RequestRouterInterface &request_router) { json self; self["symbol"] = ToJson(node_info.symbol); - self["labels"] = ToJson(node_info.labels, request_manager); + self["labels"] = ToJson(node_info.labels, request_router); const auto *props = std::get_if(&node_info.properties); - self["properties"] = ToJson(props ? *props : PropertiesMapList{}, request_manager); + self["properties"] = ToJson(props ? *props : PropertiesMapList{}, request_router); return self; } -json ToJson(const EdgeCreationInfo &edge_info, const RequestRouterInterface &request_manager) { +json ToJson(const EdgeCreationInfo &edge_info, const RequestRouterInterface &request_router) { json self; self["symbol"] = ToJson(edge_info.symbol); const auto *props = std::get_if(&edge_info.properties); - self["properties"] = ToJson(props ? *props : PropertiesMapList{}, request_manager); - self["edge_type"] = ToJson(edge_info.edge_type, request_manager); + self["properties"] = ToJson(props ? *props : PropertiesMapList{}, request_router); + self["edge_type"] = ToJson(edge_info.edge_type, request_router); self["direction"] = ToString(edge_info.direction); return self; } @@ -432,7 +432,7 @@ bool PlanToJsonVisitor::PreVisit(ScanAll &op) { bool PlanToJsonVisitor::PreVisit(ScanAllByLabel &op) { json self; self["name"] = "ScanAllByLabel"; - self["label"] = ToJson(op.label_, *request_manager_); + self["label"] = ToJson(op.label_, *request_router_); self["output_symbol"] = ToJson(op.output_symbol_); op.input_->Accept(*this); @@ -445,8 +445,8 @@ bool PlanToJsonVisitor::PreVisit(ScanAllByLabel &op) { bool PlanToJsonVisitor::PreVisit(ScanAllByLabelPropertyRange &op) { json self; self["name"] = "ScanAllByLabelPropertyRange"; - self["label"] = ToJson(op.label_, *request_manager_); - self["property"] = ToJson(op.property_, *request_manager_); + self["label"] = ToJson(op.label_, *request_router_); + self["property"] = ToJson(op.property_, *request_router_); self["lower_bound"] = op.lower_bound_ ? ToJson(*op.lower_bound_) : json(); self["upper_bound"] = op.upper_bound_ ? ToJson(*op.upper_bound_) : json(); self["output_symbol"] = ToJson(op.output_symbol_); @@ -461,8 +461,8 @@ bool PlanToJsonVisitor::PreVisit(ScanAllByLabelPropertyRange &op) { bool PlanToJsonVisitor::PreVisit(ScanAllByLabelPropertyValue &op) { json self; self["name"] = "ScanAllByLabelPropertyValue"; - self["label"] = ToJson(op.label_, *request_manager_); - self["property"] = ToJson(op.property_, *request_manager_); + self["label"] = ToJson(op.label_, *request_router_); + self["property"] = ToJson(op.property_, *request_router_); self["expression"] = ToJson(op.expression_); self["output_symbol"] = ToJson(op.output_symbol_); @@ -476,8 +476,8 @@ bool PlanToJsonVisitor::PreVisit(ScanAllByLabelPropertyValue &op) { bool PlanToJsonVisitor::PreVisit(ScanAllByLabelProperty &op) { json self; self["name"] = "ScanAllByLabelProperty"; - self["label"] = ToJson(op.label_, *request_manager_); - self["property"] = ToJson(op.property_, *request_manager_); + self["label"] = ToJson(op.label_, *request_router_); + self["property"] = ToJson(op.property_, *request_router_); self["output_symbol"] = ToJson(op.output_symbol_); op.input_->Accept(*this); @@ -500,7 +500,7 @@ bool PlanToJsonVisitor::PreVisit(ScanAllById &op) { bool PlanToJsonVisitor::PreVisit(CreateNode &op) { json self; self["name"] = "CreateNode"; - self["node_info"] = ToJson(op.node_info_, *request_manager_); + self["node_info"] = ToJson(op.node_info_, *request_router_); op.input_->Accept(*this); self["input"] = PopOutput(); @@ -513,8 +513,8 @@ bool PlanToJsonVisitor::PreVisit(CreateExpand &op) { json self; self["name"] = "CreateExpand"; self["input_symbol"] = ToJson(op.input_symbol_); - self["node_info"] = ToJson(op.node_info_, *request_manager_); - self["edge_info"] = ToJson(op.edge_info_, *request_manager_); + self["node_info"] = ToJson(op.node_info_, *request_router_); + self["edge_info"] = ToJson(op.edge_info_, *request_router_); self["existing_node"] = op.existing_node_; op.input_->Accept(*this); @@ -530,7 +530,7 @@ bool PlanToJsonVisitor::PreVisit(Expand &op) { self["input_symbol"] = ToJson(op.input_symbol_); self["node_symbol"] = ToJson(op.common_.node_symbol); self["edge_symbol"] = ToJson(op.common_.edge_symbol); - self["edge_types"] = ToJson(op.common_.edge_types, *request_manager_); + self["edge_types"] = ToJson(op.common_.edge_types, *request_router_); self["direction"] = ToString(op.common_.direction); self["existing_node"] = op.common_.existing_node; @@ -547,7 +547,7 @@ bool PlanToJsonVisitor::PreVisit(ExpandVariable &op) { self["input_symbol"] = ToJson(op.input_symbol_); self["node_symbol"] = ToJson(op.common_.node_symbol); self["edge_symbol"] = ToJson(op.common_.edge_symbol); - self["edge_types"] = ToJson(op.common_.edge_types, *request_manager_); + self["edge_types"] = ToJson(op.common_.edge_types, *request_router_); self["direction"] = ToString(op.common_.direction); self["type"] = ToString(op.type_); self["is_reverse"] = op.is_reverse_; @@ -622,7 +622,7 @@ bool PlanToJsonVisitor::PreVisit(Delete &op) { bool PlanToJsonVisitor::PreVisit(SetProperty &op) { json self; self["name"] = "SetProperty"; - self["property"] = ToJson(op.property_, *request_manager_); + self["property"] = ToJson(op.property_, *request_router_); self["lhs"] = ToJson(op.lhs_); self["rhs"] = ToJson(op.rhs_); @@ -659,7 +659,7 @@ bool PlanToJsonVisitor::PreVisit(SetLabels &op) { json self; self["name"] = "SetLabels"; self["input_symbol"] = ToJson(op.input_symbol_); - self["labels"] = ToJson(op.labels_, *request_manager_); + self["labels"] = ToJson(op.labels_, *request_router_); op.input_->Accept(*this); self["input"] = PopOutput(); @@ -671,7 +671,7 @@ bool PlanToJsonVisitor::PreVisit(SetLabels &op) { bool PlanToJsonVisitor::PreVisit(RemoveProperty &op) { json self; self["name"] = "RemoveProperty"; - self["property"] = ToJson(op.property_, *request_manager_); + self["property"] = ToJson(op.property_, *request_router_); self["lhs"] = ToJson(op.lhs_); op.input_->Accept(*this); @@ -685,7 +685,7 @@ bool PlanToJsonVisitor::PreVisit(RemoveLabels &op) { json self; self["name"] = "RemoveLabels"; self["input_symbol"] = ToJson(op.input_symbol_); - self["labels"] = ToJson(op.labels_, *request_manager_); + self["labels"] = ToJson(op.labels_, *request_router_); op.input_->Accept(*this); self["input"] = PopOutput(); diff --git a/src/query/v2/plan/pretty_print.hpp b/src/query/v2/plan/pretty_print.hpp index 7e97de3ff..4094d7c81 100644 --- a/src/query/v2/plan/pretty_print.hpp +++ b/src/query/v2/plan/pretty_print.hpp @@ -30,16 +30,16 @@ class LogicalOperator; /// RequestRouter is needed for resolving label and property names. /// Note that `plan_root` isn't modified, but we can't take it as a const /// because we don't have support for visiting a const LogicalOperator. -void PrettyPrint(const RequestRouterInterface &request_manager, const LogicalOperator *plan_root, std::ostream *out); +void PrettyPrint(const RequestRouterInterface &request_router, const LogicalOperator *plan_root, std::ostream *out); /// Overload of `PrettyPrint` which defaults the `std::ostream` to `std::cout`. -inline void PrettyPrint(const RequestRouterInterface &request_manager, const LogicalOperator *plan_root) { - PrettyPrint(request_manager, plan_root, &std::cout); +inline void PrettyPrint(const RequestRouterInterface &request_router, const LogicalOperator *plan_root) { + PrettyPrint(request_router, plan_root, &std::cout); } /// Convert a `LogicalOperator` plan to a JSON representation. /// DbAccessor is needed for resolving label and property names. -nlohmann::json PlanToJson(const RequestRouterInterface &request_manager, const LogicalOperator *plan_root); +nlohmann::json PlanToJson(const RequestRouterInterface &request_router, const LogicalOperator *plan_root); class PlanPrinter : public virtual HierarchicalLogicalOperatorVisitor { public: @@ -47,7 +47,7 @@ class PlanPrinter : public virtual HierarchicalLogicalOperatorVisitor { using HierarchicalLogicalOperatorVisitor::PreVisit; using HierarchicalLogicalOperatorVisitor::Visit; - PlanPrinter(const RequestRouterInterface *request_manager, std::ostream *out); + PlanPrinter(const RequestRouterInterface *request_router, std::ostream *out); bool DefaultPreVisit() override; @@ -114,7 +114,7 @@ class PlanPrinter : public virtual HierarchicalLogicalOperatorVisitor { void Branch(LogicalOperator &op, const std::string &branch_name = ""); int64_t depth_{0}; - const RequestRouterInterface *request_manager_{nullptr}; + const RequestRouterInterface *request_router_{nullptr}; std::ostream *out_{nullptr}; }; @@ -132,20 +132,20 @@ nlohmann::json ToJson(const utils::Bound &bound); nlohmann::json ToJson(const Symbol &symbol); -nlohmann::json ToJson(storage::v3::EdgeTypeId edge_type, const RequestRouterInterface &request_manager); +nlohmann::json ToJson(storage::v3::EdgeTypeId edge_type, const RequestRouterInterface &request_router); -nlohmann::json ToJson(storage::v3::LabelId label, const RequestRouterInterface &request_manager); +nlohmann::json ToJson(storage::v3::LabelId label, const RequestRouterInterface &request_router); -nlohmann::json ToJson(storage::v3::PropertyId property, const RequestRouterInterface &request_manager); +nlohmann::json ToJson(storage::v3::PropertyId property, const RequestRouterInterface &request_router); nlohmann::json ToJson(NamedExpression *nexpr); nlohmann::json ToJson(const std::vector> &properties, - const RequestRouterInterface &request_manager); + const RequestRouterInterface &request_router); -nlohmann::json ToJson(const NodeCreationInfo &node_info, const RequestRouterInterface &request_manager); +nlohmann::json ToJson(const NodeCreationInfo &node_info, const RequestRouterInterface &request_router); -nlohmann::json ToJson(const EdgeCreationInfo &edge_info, const RequestRouterInterface &request_manager); +nlohmann::json ToJson(const EdgeCreationInfo &edge_info, const RequestRouterInterface &request_router); nlohmann::json ToJson(const Aggregate::Element &elem); @@ -160,7 +160,7 @@ nlohmann::json ToJson(const std::vector &items, Args &&...args) { class PlanToJsonVisitor : public virtual HierarchicalLogicalOperatorVisitor { public: - explicit PlanToJsonVisitor(const RequestRouterInterface *request_manager) : request_manager_(request_manager) {} + explicit PlanToJsonVisitor(const RequestRouterInterface *request_router) : request_router_(request_router) {} using HierarchicalLogicalOperatorVisitor::PostVisit; using HierarchicalLogicalOperatorVisitor::PreVisit; @@ -216,7 +216,7 @@ class PlanToJsonVisitor : public virtual HierarchicalLogicalOperatorVisitor { protected: nlohmann::json output_; - const RequestRouterInterface *request_manager_; + const RequestRouterInterface *request_router_; nlohmann::json PopOutput() { nlohmann::json tmp; diff --git a/tests/simulation/request_router.cpp b/tests/simulation/request_router.cpp index ddd9351ae..8187f138b 100644 --- a/tests/simulation/request_router.cpp +++ b/tests/simulation/request_router.cpp @@ -151,10 +151,10 @@ void RunStorageRaft(Raft state{.label = "test_label"}; - auto result = io.Request(state); + auto result = request_router.Request(state); MG_ASSERT(result.size() == 2); { auto prop = result[0].GetProperty(msgs::PropertyId::FromUint(0)); @@ -163,7 +163,7 @@ void TestScanVertices(query::v2::RequestRouterInterface &io) { MG_ASSERT(prop.int_v == 444); } - result = io.Request(state); + result = request_router.Request(state); { MG_ASSERT(result.size() == 1); auto prop = result[0].GetProperty(msgs::PropertyId::FromUint(0)); @@ -171,11 +171,11 @@ void TestScanVertices(query::v2::RequestRouterInterface &io) { } } -void TestCreateVertices(query::v2::RequestRouterInterface &io) { +void TestCreateVertices(query::v2::RequestRouterInterface &request_router) { using PropVal = msgs::Value; msgs::ExecutionState state; std::vector new_vertices; - auto label_id = io.NameToLabel("test_label"); + auto label_id = request_router.NameToLabel("test_label"); msgs::NewVertex a1{.primary_key = {PropVal(int64_t(1)), PropVal(int64_t(0))}}; a1.label_ids.push_back({label_id}); msgs::NewVertex a2{.primary_key = {PropVal(int64_t(13)), PropVal(int64_t(13))}}; @@ -183,17 +183,17 @@ void TestCreateVertices(query::v2::RequestRouterInterface &io) { new_vertices.push_back(std::move(a1)); new_vertices.push_back(std::move(a2)); - auto result = io.Request(state, std::move(new_vertices)); + auto result = request_router.Request(state, std::move(new_vertices)); MG_ASSERT(result.size() == 2); } -void TestCreateExpand(query::v2::RequestRouterInterface &io) { +void TestCreateExpand(query::v2::RequestRouterInterface &request_router) { using PropVal = msgs::Value; msgs::ExecutionState state; std::vector new_expands; - const auto edge_type_id = io.NameToEdgeType("edge_type"); - const auto label = msgs::Label{io.NameToLabel("test_label")}; + const auto edge_type_id = request_router.NameToEdgeType("edge_type"); + const auto label = msgs::Label{request_router.NameToLabel("test_label")}; const msgs::VertexId vertex_id_1{label, {PropVal(int64_t(0)), PropVal(int64_t(0))}}; const msgs::VertexId vertex_id_2{label, {PropVal(int64_t(13)), PropVal(int64_t(13))}}; msgs::NewExpand expand_1{ @@ -203,7 +203,7 @@ void TestCreateExpand(query::v2::RequestRouterInterface &io) { new_expands.push_back(std::move(expand_1)); new_expands.push_back(std::move(expand_2)); - auto responses = io.Request(state, std::move(new_expands)); + auto responses = request_router.Request(state, std::move(new_expands)); MG_ASSERT(responses.size() == 2); MG_ASSERT(responses[0].success); MG_ASSERT(responses[1].success); @@ -222,7 +222,7 @@ void TestExpandOne(query::v2::RequestRouterInterface &request_router) { } template -void TestAggregate(RequestRouter &io) {} +void TestAggregate(RequestRouter &request_router) {} void DoTest() { SimulatorConfig config{ @@ -337,12 +337,12 @@ void DoTest() { // also get the current shard map CoordinatorClient coordinator_client(cli_io, c_addrs[0], c_addrs); - query::v2::RequestRouter io(std::move(coordinator_client), std::move(cli_io)); + query::v2::RequestRouter request_router(std::move(coordinator_client), std::move(cli_io)); - io.StartTransaction(); - TestScanVertices(io); - TestCreateVertices(io); - TestCreateExpand(io); + request_router.StartTransaction(); + TestScanVertices(request_router); + TestCreateVertices(request_router); + TestCreateExpand(request_router); simulator.ShutDown(); From 4f18fa7431d9ef6a63a29e9e6da2089649fa75ed Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Tue, 29 Nov 2022 09:07:18 +0000 Subject: [PATCH 3/3] Fix LCP that broke with invisible merge conflict --- src/query/v2/frontend/ast/ast.lcp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/query/v2/frontend/ast/ast.lcp b/src/query/v2/frontend/ast/ast.lcp index dadcc9fa7..03c0c99f0 100644 --- a/src/query/v2/frontend/ast/ast.lcp +++ b/src/query/v2/frontend/ast/ast.lcp @@ -24,7 +24,7 @@ #include "query/v2/bindings/typed_value.hpp" #include "query/v2/db_accessor.hpp" #include "query/v2/path.hpp" -#include "query/v2/shard_request_manager.hpp" +#include "query/v2/request_router.hpp" #include "utils/typeinfo.hpp" #include "query/v2/conversions.hpp" @@ -838,14 +838,14 @@ cpp<# :slk-load (slk-load-ast-vector "Expression")) (function-name "std::string" :scope :public) (function "std::function &)>" + const functions::FunctionContext &)>" :scope :public :dont-save t :clone :copy :slk-load (lambda (member) #>cpp self->${member} = functions::NameToFunction, + functions::FunctionContext, functions::QueryEngineTag, decltype(ValueToTypedValueFunctor)>(self->function_name_); cpp<#))) (:public @@ -869,7 +869,7 @@ cpp<# const std::vector &arguments) : arguments_(arguments), function_name_(function_name), - function_(functions::NameToFunction, + function_(functions::NameToFunction, functions::QueryEngineTag, decltype(ValueToTypedValueFunctor)>(function_name_)) { if (!function_) { throw SemanticException("Function '{}' doesn't exist.", function_name);