Rename ShardRequestManager to RequestRouter

This commit is contained in:
Tyler Neely 2022-11-28 12:38:38 +00:00
parent f4d0c7769e
commit 82db1d4ad8
24 changed files with 255 additions and 273 deletions

View File

@ -18,8 +18,8 @@
#include "common/errors.hpp" #include "common/errors.hpp"
#include "coordinator/shard_map.hpp" #include "coordinator/shard_map.hpp"
#include "query/v2/accessors.hpp" #include "query/v2/accessors.hpp"
#include "query/v2/request_router.hpp"
#include "query/v2/requests.hpp" #include "query/v2/requests.hpp"
#include "query/v2/shard_request_manager.hpp"
#include "storage/v3/edge_accessor.hpp" #include "storage/v3/edge_accessor.hpp"
#include "storage/v3/id_types.hpp" #include "storage/v3/id_types.hpp"
#include "storage/v3/shard.hpp" #include "storage/v3/shard.hpp"
@ -72,7 +72,7 @@ query::v2::TypedValue ToTypedValue(const Value &value) {
} }
communication::bolt::Vertex ToBoltVertex(const query::v2::accessors::VertexAccessor &vertex, communication::bolt::Vertex ToBoltVertex(const query::v2::accessors::VertexAccessor &vertex,
const query::v2::ShardRequestManagerInterface *shard_request_manager, const query::v2::RequestRouterInterface *request_router,
storage::v3::View /*view*/) { storage::v3::View /*view*/) {
auto id = communication::bolt::Id::FromUint(0); auto id = communication::bolt::Id::FromUint(0);
@ -80,44 +80,44 @@ communication::bolt::Vertex ToBoltVertex(const query::v2::accessors::VertexAcces
std::vector<std::string> new_labels; std::vector<std::string> new_labels;
new_labels.reserve(labels.size()); new_labels.reserve(labels.size());
for (const auto &label : labels) { for (const auto &label : labels) {
new_labels.push_back(shard_request_manager->LabelToName(label.id)); new_labels.push_back(request_router->LabelToName(label.id));
} }
auto properties = vertex.Properties(); auto properties = vertex.Properties();
std::map<std::string, Value> new_properties; std::map<std::string, Value> new_properties;
for (const auto &[prop, property_value] : properties) { for (const auto &[prop, property_value] : properties) {
new_properties[shard_request_manager->PropertyToName(prop)] = ToBoltValue(property_value); new_properties[request_router->PropertyToName(prop)] = ToBoltValue(property_value);
} }
return communication::bolt::Vertex{id, new_labels, new_properties}; return communication::bolt::Vertex{id, new_labels, new_properties};
} }
communication::bolt::Edge ToBoltEdge(const query::v2::accessors::EdgeAccessor &edge, communication::bolt::Edge ToBoltEdge(const query::v2::accessors::EdgeAccessor &edge,
const query::v2::ShardRequestManagerInterface *shard_request_manager, const query::v2::RequestRouterInterface *request_router,
storage::v3::View /*view*/) { storage::v3::View /*view*/) {
// TODO(jbajic) Fix bolt communication // TODO(jbajic) Fix bolt communication
auto id = communication::bolt::Id::FromUint(0); auto id = communication::bolt::Id::FromUint(0);
auto from = communication::bolt::Id::FromUint(0); auto from = communication::bolt::Id::FromUint(0);
auto to = communication::bolt::Id::FromUint(0); auto to = communication::bolt::Id::FromUint(0);
const auto &type = shard_request_manager->EdgeTypeToName(edge.EdgeType()); const auto &type = request_router->EdgeTypeToName(edge.EdgeType());
auto properties = edge.Properties(); auto properties = edge.Properties();
std::map<std::string, Value> new_properties; std::map<std::string, Value> new_properties;
for (const auto &[prop, property_value] : properties) { for (const auto &[prop, property_value] : properties) {
new_properties[shard_request_manager->PropertyToName(prop)] = ToBoltValue(property_value); new_properties[request_router->PropertyToName(prop)] = ToBoltValue(property_value);
} }
return communication::bolt::Edge{id, from, to, type, new_properties}; return communication::bolt::Edge{id, from, to, type, new_properties};
} }
communication::bolt::Path ToBoltPath(const query::v2::accessors::Path & /*edge*/, communication::bolt::Path ToBoltPath(const query::v2::accessors::Path & /*edge*/,
const query::v2::ShardRequestManagerInterface * /*shard_request_manager*/, const query::v2::RequestRouterInterface * /*request_router*/,
storage::v3::View /*view*/) { storage::v3::View /*view*/) {
// TODO(jbajic) Fix bolt communication // TODO(jbajic) Fix bolt communication
MG_ASSERT(false, "Path is unimplemented!"); MG_ASSERT(false, "Path is unimplemented!");
return {}; return {};
} }
Value ToBoltValue(const query::v2::TypedValue &value, Value ToBoltValue(const query::v2::TypedValue &value, const query::v2::RequestRouterInterface *request_router,
const query::v2::ShardRequestManagerInterface *shard_request_manager, storage::v3::View view) { storage::v3::View view) {
switch (value.type()) { switch (value.type()) {
case query::v2::TypedValue::Type::Null: case query::v2::TypedValue::Type::Null:
return {}; return {};
@ -133,7 +133,7 @@ Value ToBoltValue(const query::v2::TypedValue &value,
std::vector<Value> values; std::vector<Value> values;
values.reserve(value.ValueList().size()); values.reserve(value.ValueList().size());
for (const auto &v : value.ValueList()) { for (const auto &v : value.ValueList()) {
auto value = ToBoltValue(v, shard_request_manager, view); auto value = ToBoltValue(v, request_router, view);
values.emplace_back(std::move(value)); values.emplace_back(std::move(value));
} }
return {std::move(values)}; return {std::move(values)};
@ -141,21 +141,21 @@ Value ToBoltValue(const query::v2::TypedValue &value,
case query::v2::TypedValue::Type::Map: { case query::v2::TypedValue::Type::Map: {
std::map<std::string, Value> map; std::map<std::string, Value> map;
for (const auto &kv : value.ValueMap()) { for (const auto &kv : value.ValueMap()) {
auto value = ToBoltValue(kv.second, shard_request_manager, view); auto value = ToBoltValue(kv.second, request_router, view);
map.emplace(kv.first, std::move(value)); map.emplace(kv.first, std::move(value));
} }
return {std::move(map)}; return {std::move(map)};
} }
case query::v2::TypedValue::Type::Vertex: { case query::v2::TypedValue::Type::Vertex: {
auto vertex = ToBoltVertex(value.ValueVertex(), shard_request_manager, view); auto vertex = ToBoltVertex(value.ValueVertex(), request_router, view);
return {std::move(vertex)}; return {std::move(vertex)};
} }
case query::v2::TypedValue::Type::Edge: { case query::v2::TypedValue::Type::Edge: {
auto edge = ToBoltEdge(value.ValueEdge(), shard_request_manager, view); auto edge = ToBoltEdge(value.ValueEdge(), request_router, view);
return {std::move(edge)}; return {std::move(edge)};
} }
case query::v2::TypedValue::Type::Path: { case query::v2::TypedValue::Type::Path: {
auto path = ToBoltPath(value.ValuePath(), shard_request_manager, view); auto path = ToBoltPath(value.ValuePath(), request_router, view);
return {std::move(path)}; return {std::move(path)};
} }
case query::v2::TypedValue::Type::Date: case query::v2::TypedValue::Type::Date:

View File

@ -15,7 +15,7 @@
#include "communication/bolt/v1/value.hpp" #include "communication/bolt/v1/value.hpp"
#include "coordinator/shard_map.hpp" #include "coordinator/shard_map.hpp"
#include "query/v2/bindings/typed_value.hpp" #include "query/v2/bindings/typed_value.hpp"
#include "query/v2/shard_request_manager.hpp" #include "query/v2/request_router.hpp"
#include "storage/v3/property_value.hpp" #include "storage/v3/property_value.hpp"
#include "storage/v3/result.hpp" #include "storage/v3/result.hpp"
#include "storage/v3/shard.hpp" #include "storage/v3/shard.hpp"
@ -32,40 +32,37 @@ namespace memgraph::glue::v2 {
/// @param storage::v3::VertexAccessor for converting to /// @param storage::v3::VertexAccessor for converting to
/// communication::bolt::Vertex. /// communication::bolt::Vertex.
/// @param query::v2::ShardRequestManagerInterface *shard_request_manager getting label and property names. /// @param query::v2::RequestRouterInterface *request_router getting label and property names.
/// @param storage::v3::View for deciding which vertex attributes are visible. /// @param storage::v3::View for deciding which vertex attributes are visible.
/// ///
/// @throw std::bad_alloc /// @throw std::bad_alloc
communication::bolt::Vertex ToBoltVertex(const storage::v3::VertexAccessor &vertex, communication::bolt::Vertex ToBoltVertex(const storage::v3::VertexAccessor &vertex,
const query::v2::ShardRequestManagerInterface *shard_request_manager, const query::v2::RequestRouterInterface *request_router,
storage::v3::View view); storage::v3::View view);
/// @param storage::v3::EdgeAccessor for converting to communication::bolt::Edge. /// @param storage::v3::EdgeAccessor for converting to communication::bolt::Edge.
/// @param query::v2::ShardRequestManagerInterface *shard_request_manager getting edge type and property names. /// @param query::v2::RequestRouterInterface *request_router getting edge type and property names.
/// @param storage::v3::View for deciding which edge attributes are visible. /// @param storage::v3::View for deciding which edge attributes are visible.
/// ///
/// @throw std::bad_alloc /// @throw std::bad_alloc
communication::bolt::Edge ToBoltEdge(const storage::v3::EdgeAccessor &edge, communication::bolt::Edge ToBoltEdge(const storage::v3::EdgeAccessor &edge,
const query::v2::ShardRequestManagerInterface *shard_request_manager, const query::v2::RequestRouterInterface *request_router, storage::v3::View view);
storage::v3::View view);
/// @param query::v2::Path for converting to communication::bolt::Path. /// @param query::v2::Path for converting to communication::bolt::Path.
/// @param query::v2::ShardRequestManagerInterface *shard_request_manager ToBoltVertex and ToBoltEdge. /// @param query::v2::RequestRouterInterface *request_router ToBoltVertex and ToBoltEdge.
/// @param storage::v3::View for ToBoltVertex and ToBoltEdge. /// @param storage::v3::View for ToBoltVertex and ToBoltEdge.
/// ///
/// @throw std::bad_alloc /// @throw std::bad_alloc
communication::bolt::Path ToBoltPath(const query::v2::accessors::Path &path, communication::bolt::Path ToBoltPath(const query::v2::accessors::Path &path,
const query::v2::ShardRequestManagerInterface *shard_request_manager, const query::v2::RequestRouterInterface *request_router, storage::v3::View view);
storage::v3::View view);
/// @param query::v2::TypedValue for converting to communication::bolt::Value. /// @param query::v2::TypedValue for converting to communication::bolt::Value.
/// @param query::v2::ShardRequestManagerInterface *shard_request_manager ToBoltVertex and ToBoltEdge. /// @param query::v2::RequestRouterInterface *request_router ToBoltVertex and ToBoltEdge.
/// @param storage::v3::View for ToBoltVertex and ToBoltEdge. /// @param storage::v3::View for ToBoltVertex and ToBoltEdge.
/// ///
/// @throw std::bad_alloc /// @throw std::bad_alloc
communication::bolt::Value ToBoltValue(const query::v2::TypedValue &value, communication::bolt::Value ToBoltValue(const query::v2::TypedValue &value,
const query::v2::ShardRequestManagerInterface *shard_request_manager, const query::v2::RequestRouterInterface *request_router, storage::v3::View view);
storage::v3::View view);
query::v2::TypedValue ToTypedValue(const communication::bolt::Value &value); query::v2::TypedValue ToTypedValue(const communication::bolt::Value &value);
@ -75,8 +72,7 @@ storage::v3::PropertyValue ToPropertyValue(const communication::bolt::Value &val
communication::bolt::Value ToBoltValue(msgs::Value value); communication::bolt::Value ToBoltValue(msgs::Value value);
communication::bolt::Value ToBoltValue(msgs::Value value, communication::bolt::Value ToBoltValue(msgs::Value value, const query::v2::RequestRouterInterface *request_router,
const query::v2::ShardRequestManagerInterface *shard_request_manager,
storage::v3::View view); storage::v3::View view);
} // namespace memgraph::glue::v2 } // namespace memgraph::glue::v2

View File

@ -454,7 +454,7 @@ class BoltSession final : public memgraph::communication::bolt::Session<memgraph
std::map<std::string, memgraph::communication::bolt::Value> Pull(TEncoder *encoder, std::optional<int> n, std::map<std::string, memgraph::communication::bolt::Value> Pull(TEncoder *encoder, std::optional<int> n,
std::optional<int> qid) override { std::optional<int> qid) override {
TypedValueResultStream stream(encoder, interpreter_.GetShardRequestManager()); TypedValueResultStream stream(encoder, interpreter_.GetRequestRouter());
return PullResults(stream, n, qid); return PullResults(stream, n, qid);
} }
@ -481,7 +481,7 @@ class BoltSession final : public memgraph::communication::bolt::Session<memgraph
const auto &summary = interpreter_.Pull(&stream, n, qid); const auto &summary = interpreter_.Pull(&stream, n, qid);
std::map<std::string, memgraph::communication::bolt::Value> decoded_summary; std::map<std::string, memgraph::communication::bolt::Value> decoded_summary;
for (const auto &kv : summary) { for (const auto &kv : summary) {
auto bolt_value = memgraph::glue::v2::ToBoltValue(kv.second, interpreter_.GetShardRequestManager(), auto bolt_value = memgraph::glue::v2::ToBoltValue(kv.second, interpreter_.GetRequestRouter(),
memgraph::storage::v3::View::NEW); memgraph::storage::v3::View::NEW);
decoded_summary.emplace(kv.first, std::move(bolt_value)); decoded_summary.emplace(kv.first, std::move(bolt_value));
} }
@ -497,15 +497,14 @@ class BoltSession final : public memgraph::communication::bolt::Session<memgraph
/// before forwarding the calls to original TEncoder. /// before forwarding the calls to original TEncoder.
class TypedValueResultStream { class TypedValueResultStream {
public: public:
TypedValueResultStream(TEncoder *encoder, TypedValueResultStream(TEncoder *encoder, const memgraph::query::v2::RequestRouterInterface *request_router)
const memgraph::query::v2::ShardRequestManagerInterface *shard_request_manager) : encoder_(encoder), request_router_(request_router) {}
: encoder_(encoder), shard_request_manager_(shard_request_manager) {}
void Result(const std::vector<memgraph::query::v2::TypedValue> &values) { void Result(const std::vector<memgraph::query::v2::TypedValue> &values) {
std::vector<memgraph::communication::bolt::Value> decoded_values; std::vector<memgraph::communication::bolt::Value> decoded_values;
decoded_values.reserve(values.size()); decoded_values.reserve(values.size());
for (const auto &v : values) { for (const auto &v : values) {
auto bolt_value = memgraph::glue::v2::ToBoltValue(v, shard_request_manager_, memgraph::storage::v3::View::NEW); auto bolt_value = memgraph::glue::v2::ToBoltValue(v, request_router_, memgraph::storage::v3::View::NEW);
decoded_values.emplace_back(std::move(bolt_value)); decoded_values.emplace_back(std::move(bolt_value));
} }
encoder_->MessageRecord(decoded_values); encoder_->MessageRecord(decoded_values);
@ -513,7 +512,7 @@ class BoltSession final : public memgraph::communication::bolt::Session<memgraph
private: private:
TEncoder *encoder_; TEncoder *encoder_;
const memgraph::query::v2::ShardRequestManagerInterface *shard_request_manager_{nullptr}; const memgraph::query::v2::RequestRouterInterface *request_router_{nullptr};
}; };
memgraph::query::v2::Interpreter interpreter_; memgraph::query::v2::Interpreter interpreter_;
memgraph::communication::v2::ServerEndpoint endpoint_; memgraph::communication::v2::ServerEndpoint endpoint_;

View File

@ -10,12 +10,12 @@
// licenses/APL.txt. // licenses/APL.txt.
#include "query/v2/accessors.hpp" #include "query/v2/accessors.hpp"
#include "query/v2/request_router.hpp"
#include "query/v2/requests.hpp" #include "query/v2/requests.hpp"
#include "query/v2/shard_request_manager.hpp"
#include "storage/v3/id_types.hpp" #include "storage/v3/id_types.hpp"
namespace memgraph::query::v2::accessors { namespace memgraph::query::v2::accessors {
EdgeAccessor::EdgeAccessor(Edge edge, const ShardRequestManagerInterface *manager) EdgeAccessor::EdgeAccessor(Edge edge, const RequestRouterInterface *manager)
: edge(std::move(edge)), manager_(manager) {} : edge(std::move(edge)), manager_(manager) {}
EdgeTypeId EdgeAccessor::EdgeType() const { return edge.type.id; } EdgeTypeId EdgeAccessor::EdgeType() const { return edge.type.id; }
@ -44,11 +44,10 @@ VertexAccessor EdgeAccessor::From() const {
} }
VertexAccessor::VertexAccessor(Vertex v, std::vector<std::pair<PropertyId, Value>> props, VertexAccessor::VertexAccessor(Vertex v, std::vector<std::pair<PropertyId, Value>> props,
const ShardRequestManagerInterface *manager) const RequestRouterInterface *manager)
: vertex(std::move(v)), properties(std::move(props)), manager_(manager) {} : vertex(std::move(v)), properties(std::move(props)), manager_(manager) {}
VertexAccessor::VertexAccessor(Vertex v, std::map<PropertyId, Value> &&props, VertexAccessor::VertexAccessor(Vertex v, std::map<PropertyId, Value> &&props, const RequestRouterInterface *manager)
const ShardRequestManagerInterface *manager)
: vertex(std::move(v)), manager_(manager) { : vertex(std::move(v)), manager_(manager) {
properties.reserve(props.size()); properties.reserve(props.size());
for (auto &[id, value] : props) { for (auto &[id, value] : props) {
@ -57,7 +56,7 @@ VertexAccessor::VertexAccessor(Vertex v, std::map<PropertyId, Value> &&props,
} }
VertexAccessor::VertexAccessor(Vertex v, const std::map<PropertyId, Value> &props, VertexAccessor::VertexAccessor(Vertex v, const std::map<PropertyId, Value> &props,
const ShardRequestManagerInterface *manager) const RequestRouterInterface *manager)
: vertex(std::move(v)), manager_(manager) { : vertex(std::move(v)), manager_(manager) {
properties.reserve(props.size()); properties.reserve(props.size());
for (const auto &[id, value] : props) { for (const auto &[id, value] : props) {

View File

@ -25,7 +25,7 @@
#include "utils/memory_tracker.hpp" #include "utils/memory_tracker.hpp"
namespace memgraph::query::v2 { namespace memgraph::query::v2 {
class ShardRequestManagerInterface; class RequestRouterInterface;
} // namespace memgraph::query::v2 } // namespace memgraph::query::v2
namespace memgraph::query::v2::accessors { namespace memgraph::query::v2::accessors {
@ -41,7 +41,7 @@ class VertexAccessor;
class EdgeAccessor final { class EdgeAccessor final {
public: public:
explicit EdgeAccessor(Edge edge, const ShardRequestManagerInterface *manager); explicit EdgeAccessor(Edge edge, const RequestRouterInterface *manager);
[[nodiscard]] EdgeTypeId EdgeType() const; [[nodiscard]] EdgeTypeId EdgeType() const;
@ -69,7 +69,7 @@ class EdgeAccessor final {
private: private:
Edge edge; Edge edge;
const ShardRequestManagerInterface *manager_; const RequestRouterInterface *manager_;
}; };
class VertexAccessor final { class VertexAccessor final {
@ -77,11 +77,10 @@ class VertexAccessor final {
using PropertyId = msgs::PropertyId; using PropertyId = msgs::PropertyId;
using Label = msgs::Label; using Label = msgs::Label;
using VertexId = msgs::VertexId; using VertexId = msgs::VertexId;
VertexAccessor(Vertex v, std::vector<std::pair<PropertyId, Value>> props, VertexAccessor(Vertex v, std::vector<std::pair<PropertyId, Value>> props, const RequestRouterInterface *manager);
const ShardRequestManagerInterface *manager);
VertexAccessor(Vertex v, std::map<PropertyId, Value> &&props, const ShardRequestManagerInterface *manager); VertexAccessor(Vertex v, std::map<PropertyId, Value> &&props, const RequestRouterInterface *manager);
VertexAccessor(Vertex v, const std::map<PropertyId, Value> &props, const ShardRequestManagerInterface *manager); VertexAccessor(Vertex v, const std::map<PropertyId, Value> &props, const RequestRouterInterface *manager);
[[nodiscard]] Label PrimaryLabel() const; [[nodiscard]] Label PrimaryLabel() const;
@ -150,7 +149,7 @@ class VertexAccessor final {
private: private:
Vertex vertex; Vertex vertex;
std::vector<std::pair<PropertyId, Value>> properties; std::vector<std::pair<PropertyId, Value>> properties;
const ShardRequestManagerInterface *manager_; const RequestRouterInterface *manager_;
}; };
// inline VertexAccessor EdgeAccessor::To() const { return VertexAccessor(impl_.ToVertex()); } // inline VertexAccessor EdgeAccessor::To() const { return VertexAccessor(impl_.ToVertex()); }

View File

@ -26,7 +26,7 @@
namespace memgraph::query::v2 { namespace memgraph::query::v2 {
class ShardRequestManagerInterface; class RequestRouterInterface;
inline const auto lam = [](const auto &val) { return ValueToTypedValue(val); }; inline const auto lam = [](const auto &val) { return ValueToTypedValue(val); };
namespace detail { namespace detail {
@ -35,15 +35,14 @@ class Callable {
auto operator()(const storage::v3::PropertyValue &val) const { auto operator()(const storage::v3::PropertyValue &val) const {
return storage::v3::PropertyToTypedValue<TypedValue>(val); return storage::v3::PropertyToTypedValue<TypedValue>(val);
}; };
auto operator()(const msgs::Value &val, ShardRequestManagerInterface *manager) const { auto operator()(const msgs::Value &val, RequestRouterInterface *manager) const {
return ValueToTypedValue(val, manager); return ValueToTypedValue(val, manager);
}; };
}; };
} // namespace detail } // namespace detail
using ExpressionEvaluator = using ExpressionEvaluator = expr::ExpressionEvaluator<TypedValue, query::v2::EvaluationContext, RequestRouterInterface,
expr::ExpressionEvaluator<TypedValue, query::v2::EvaluationContext, ShardRequestManagerInterface, storage::v3::View, storage::v3::View, storage::v3::LabelId, msgs::Value,
storage::v3::LabelId, msgs::Value, detail::Callable, common::ErrorCode, detail::Callable, common::ErrorCode, expr::QueryEngineTag>;
expr::QueryEngineTag>;
} // namespace memgraph::query::v2 } // namespace memgraph::query::v2

View File

@ -20,7 +20,7 @@
#include "query/v2/parameters.hpp" #include "query/v2/parameters.hpp"
#include "query/v2/plan/profile.hpp" #include "query/v2/plan/profile.hpp"
//#include "query/v2/trigger.hpp" //#include "query/v2/trigger.hpp"
#include "query/v2/shard_request_manager.hpp" #include "query/v2/request_router.hpp"
#include "utils/async_timer.hpp" #include "utils/async_timer.hpp"
namespace memgraph::query::v2 { namespace memgraph::query::v2 {
@ -61,26 +61,26 @@ struct EvaluationContext {
}; };
inline std::vector<storage::v3::PropertyId> NamesToProperties(const std::vector<std::string> &property_names, inline std::vector<storage::v3::PropertyId> NamesToProperties(const std::vector<std::string> &property_names,
ShardRequestManagerInterface *shard_request_manager) { RequestRouterInterface *request_router) {
std::vector<storage::v3::PropertyId> properties; std::vector<storage::v3::PropertyId> properties;
// TODO Fix by using reference // TODO Fix by using reference
properties.reserve(property_names.size()); properties.reserve(property_names.size());
if (shard_request_manager != nullptr) { if (request_router != nullptr) {
for (const auto &name : property_names) { for (const auto &name : property_names) {
properties.push_back(shard_request_manager->NameToProperty(name)); properties.push_back(request_router->NameToProperty(name));
} }
} }
return properties; return properties;
} }
inline std::vector<storage::v3::LabelId> NamesToLabels(const std::vector<std::string> &label_names, inline std::vector<storage::v3::LabelId> NamesToLabels(const std::vector<std::string> &label_names,
ShardRequestManagerInterface *shard_request_manager) { RequestRouterInterface *request_router) {
std::vector<storage::v3::LabelId> labels; std::vector<storage::v3::LabelId> labels;
labels.reserve(label_names.size()); labels.reserve(label_names.size());
// TODO Fix by using reference // TODO Fix by using reference
if (shard_request_manager != nullptr) { if (request_router != nullptr) {
for (const auto &name : label_names) { for (const auto &name : label_names) {
labels.push_back(shard_request_manager->NameToLabel(name)); labels.push_back(request_router->NameToLabel(name));
} }
} }
return labels; return labels;
@ -97,7 +97,7 @@ struct ExecutionContext {
plan::ProfilingStats *stats_root{nullptr}; plan::ProfilingStats *stats_root{nullptr};
ExecutionStats execution_stats; ExecutionStats execution_stats;
utils::AsyncTimer timer; utils::AsyncTimer timer;
ShardRequestManagerInterface *shard_request_manager{nullptr}; RequestRouterInterface *request_router{nullptr};
IdAllocator *edge_ids_alloc; IdAllocator *edge_ids_alloc;
}; };

View File

@ -12,12 +12,12 @@
#pragma once #pragma once
#include "bindings/typed_value.hpp" #include "bindings/typed_value.hpp"
#include "query/v2/accessors.hpp" #include "query/v2/accessors.hpp"
#include "query/v2/request_router.hpp"
#include "query/v2/requests.hpp" #include "query/v2/requests.hpp"
#include "query/v2/shard_request_manager.hpp"
namespace memgraph::query::v2 { namespace memgraph::query::v2 {
inline TypedValue ValueToTypedValue(const msgs::Value &value, ShardRequestManagerInterface *manager) { inline TypedValue ValueToTypedValue(const msgs::Value &value, RequestRouterInterface *manager) {
using Value = msgs::Value; using Value = msgs::Value;
switch (value.type) { switch (value.type) {
case Value::Type::Null: case Value::Type::Null:

View File

@ -11,7 +11,7 @@
#include "query/v2/cypher_query_interpreter.hpp" #include "query/v2/cypher_query_interpreter.hpp"
#include "query/v2/bindings/symbol_generator.hpp" #include "query/v2/bindings/symbol_generator.hpp"
#include "query/v2/shard_request_manager.hpp" #include "query/v2/request_router.hpp"
// NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables) // NOLINTNEXTLINE (cppcoreguidelines-avoid-non-const-global-variables)
DEFINE_HIDDEN_bool(query_cost_planner, true, "Use the cost-estimating query planner."); DEFINE_HIDDEN_bool(query_cost_planner, true, "Use the cost-estimating query planner.");
@ -118,7 +118,7 @@ ParsedQuery ParseQuery(const std::string &query_string, const std::map<std::stri
} }
std::unique_ptr<LogicalPlan> MakeLogicalPlan(AstStorage ast_storage, CypherQuery *query, const Parameters &parameters, std::unique_ptr<LogicalPlan> MakeLogicalPlan(AstStorage ast_storage, CypherQuery *query, const Parameters &parameters,
ShardRequestManagerInterface *shard_manager, RequestRouterInterface *shard_manager,
const std::vector<Identifier *> &predefined_identifiers) { const std::vector<Identifier *> &predefined_identifiers) {
auto vertex_counts = plan::MakeVertexCountCache(shard_manager); auto vertex_counts = plan::MakeVertexCountCache(shard_manager);
auto symbol_table = expr::MakeSymbolTable(query, predefined_identifiers); auto symbol_table = expr::MakeSymbolTable(query, predefined_identifiers);
@ -130,7 +130,7 @@ std::unique_ptr<LogicalPlan> MakeLogicalPlan(AstStorage ast_storage, CypherQuery
std::shared_ptr<CachedPlan> CypherQueryToPlan(uint64_t hash, AstStorage ast_storage, CypherQuery *query, std::shared_ptr<CachedPlan> CypherQueryToPlan(uint64_t hash, AstStorage ast_storage, CypherQuery *query,
const Parameters &parameters, utils::SkipList<PlanCacheEntry> *plan_cache, const Parameters &parameters, utils::SkipList<PlanCacheEntry> *plan_cache,
ShardRequestManagerInterface *shard_manager, RequestRouterInterface *shard_manager,
const std::vector<Identifier *> &predefined_identifiers) { const std::vector<Identifier *> &predefined_identifiers) {
std::optional<utils::SkipList<PlanCacheEntry>::Accessor> plan_cache_access; std::optional<utils::SkipList<PlanCacheEntry>::Accessor> plan_cache_access;
if (plan_cache) { if (plan_cache) {

View File

@ -132,7 +132,7 @@ class SingleNodeLogicalPlan final : public LogicalPlan {
}; };
std::unique_ptr<LogicalPlan> MakeLogicalPlan(AstStorage ast_storage, CypherQuery *query, const Parameters &parameters, std::unique_ptr<LogicalPlan> MakeLogicalPlan(AstStorage ast_storage, CypherQuery *query, const Parameters &parameters,
ShardRequestManagerInterface *shard_manager, RequestRouterInterface *shard_manager,
const std::vector<Identifier *> &predefined_identifiers); const std::vector<Identifier *> &predefined_identifiers);
/** /**
@ -145,7 +145,7 @@ std::unique_ptr<LogicalPlan> MakeLogicalPlan(AstStorage ast_storage, CypherQuery
*/ */
std::shared_ptr<CachedPlan> CypherQueryToPlan(uint64_t hash, AstStorage ast_storage, CypherQuery *query, std::shared_ptr<CachedPlan> CypherQueryToPlan(uint64_t hash, AstStorage ast_storage, CypherQuery *query,
const Parameters &parameters, utils::SkipList<PlanCacheEntry> *plan_cache, const Parameters &parameters, utils::SkipList<PlanCacheEntry> *plan_cache,
ShardRequestManagerInterface *shard_manager, RequestRouterInterface *shard_manager,
const std::vector<Identifier *> &predefined_identifiers = {}); const std::vector<Identifier *> &predefined_identifiers = {});
} // namespace memgraph::query::v2 } // namespace memgraph::query::v2

View File

@ -23,7 +23,7 @@
#include "query/v2/bindings/typed_value.hpp" #include "query/v2/bindings/typed_value.hpp"
#include "query/v2/conversions.hpp" #include "query/v2/conversions.hpp"
#include "query/v2/exceptions.hpp" #include "query/v2/exceptions.hpp"
#include "query/v2/shard_request_manager.hpp" #include "query/v2/request_router.hpp"
#include "storage/v3/conversions.hpp" #include "storage/v3/conversions.hpp"
#include "utils/string.hpp" #include "utils/string.hpp"
#include "utils/temporal.hpp" #include "utils/temporal.hpp"

View File

@ -22,7 +22,7 @@
namespace memgraph::query::v2 { namespace memgraph::query::v2 {
class ShardRequestManagerInterface; class RequestRouterInterface;
namespace { namespace {
const char kStartsWith[] = "STARTSWITH"; const char kStartsWith[] = "STARTSWITH";
@ -32,9 +32,9 @@ const char kId[] = "ID";
} // namespace } // namespace
struct FunctionContext { struct FunctionContext {
// TODO(kostasrim) consider optional here. ShardRequestManager does not exist on the storage. // TODO(kostasrim) consider optional here. RequestRouter does not exist on the storage.
// DbAccessor *db_accessor; // DbAccessor *db_accessor;
ShardRequestManagerInterface *manager; RequestRouterInterface *manager;
utils::MemoryResource *memory; utils::MemoryResource *memory;
int64_t timestamp; int64_t timestamp;
std::unordered_map<std::string, int64_t> *counters; std::unordered_map<std::string, int64_t> *counters;

View File

@ -44,7 +44,7 @@
#include "query/v2/plan/planner.hpp" #include "query/v2/plan/planner.hpp"
#include "query/v2/plan/profile.hpp" #include "query/v2/plan/profile.hpp"
#include "query/v2/plan/vertex_count_cache.hpp" #include "query/v2/plan/vertex_count_cache.hpp"
#include "query/v2/shard_request_manager.hpp" #include "query/v2/request_router.hpp"
#include "storage/v3/property_value.hpp" #include "storage/v3/property_value.hpp"
#include "storage/v3/shard.hpp" #include "storage/v3/shard.hpp"
#include "utils/algorithm.hpp" #include "utils/algorithm.hpp"
@ -143,7 +143,7 @@ class ReplQueryHandler final : public query::v2::ReplicationQueryHandler {
/// @throw QueryRuntimeException if an error ocurred. /// @throw QueryRuntimeException if an error ocurred.
Callback HandleAuthQuery(AuthQuery *auth_query, AuthQueryHandler *auth, const Parameters &parameters, Callback HandleAuthQuery(AuthQuery *auth_query, AuthQueryHandler *auth, const Parameters &parameters,
ShardRequestManagerInterface *manager) { RequestRouterInterface *manager) {
// Empty frame for evaluation of password expression. This is OK since // Empty frame for evaluation of password expression. This is OK since
// password should be either null or string literal and it's evaluation // password should be either null or string literal and it's evaluation
// should not depend on frame. // should not depend on frame.
@ -312,7 +312,7 @@ Callback HandleAuthQuery(AuthQuery *auth_query, AuthQueryHandler *auth, const Pa
} }
Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &parameters, Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &parameters,
InterpreterContext *interpreter_context, ShardRequestManagerInterface *manager, InterpreterContext *interpreter_context, RequestRouterInterface *manager,
std::vector<Notification> *notifications) { std::vector<Notification> *notifications) {
expr::Frame<TypedValue> frame(0); expr::Frame<TypedValue> frame(0);
SymbolTable symbol_table; SymbolTable symbol_table;
@ -448,7 +448,7 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &
} }
Callback HandleSettingQuery(SettingQuery *setting_query, const Parameters &parameters, Callback HandleSettingQuery(SettingQuery *setting_query, const Parameters &parameters,
ShardRequestManagerInterface *manager) { RequestRouterInterface *manager) {
expr::Frame<TypedValue> frame(0); expr::Frame<TypedValue> frame(0);
SymbolTable symbol_table; SymbolTable symbol_table;
EvaluationContext evaluation_context; EvaluationContext evaluation_context;
@ -649,7 +649,7 @@ struct PullPlanVector {
struct PullPlan { struct PullPlan {
explicit PullPlan(std::shared_ptr<CachedPlan> plan, const Parameters &parameters, bool is_profile_query, explicit PullPlan(std::shared_ptr<CachedPlan> plan, const Parameters &parameters, bool is_profile_query,
DbAccessor *dba, InterpreterContext *interpreter_context, utils::MemoryResource *execution_memory, DbAccessor *dba, InterpreterContext *interpreter_context, utils::MemoryResource *execution_memory,
ShardRequestManagerInterface *shard_request_manager = nullptr, RequestRouterInterface *request_router = nullptr,
// TriggerContextCollector *trigger_context_collector = nullptr, // TriggerContextCollector *trigger_context_collector = nullptr,
std::optional<size_t> memory_limit = {}); std::optional<size_t> memory_limit = {});
std::optional<plan::ProfilingStatsWithTotalTime> Pull(AnyStream *stream, std::optional<int> n, std::optional<plan::ProfilingStatsWithTotalTime> Pull(AnyStream *stream, std::optional<int> n,
@ -679,7 +679,7 @@ struct PullPlan {
PullPlan::PullPlan(const std::shared_ptr<CachedPlan> plan, const Parameters &parameters, const bool is_profile_query, PullPlan::PullPlan(const std::shared_ptr<CachedPlan> plan, const Parameters &parameters, const bool is_profile_query,
DbAccessor *dba, InterpreterContext *interpreter_context, utils::MemoryResource *execution_memory, DbAccessor *dba, InterpreterContext *interpreter_context, utils::MemoryResource *execution_memory,
ShardRequestManagerInterface *shard_request_manager, const std::optional<size_t> memory_limit) RequestRouterInterface *request_router, const std::optional<size_t> memory_limit)
: plan_(plan), : plan_(plan),
cursor_(plan->plan().MakeCursor(execution_memory)), cursor_(plan->plan().MakeCursor(execution_memory)),
frame_(plan->symbol_table().max_position(), execution_memory), frame_(plan->symbol_table().max_position(), execution_memory),
@ -688,14 +688,14 @@ PullPlan::PullPlan(const std::shared_ptr<CachedPlan> plan, const Parameters &par
ctx_.symbol_table = plan->symbol_table(); ctx_.symbol_table = plan->symbol_table();
ctx_.evaluation_context.timestamp = QueryTimestamp(); ctx_.evaluation_context.timestamp = QueryTimestamp();
ctx_.evaluation_context.parameters = parameters; ctx_.evaluation_context.parameters = parameters;
ctx_.evaluation_context.properties = NamesToProperties(plan->ast_storage().properties_, shard_request_manager); ctx_.evaluation_context.properties = NamesToProperties(plan->ast_storage().properties_, request_router);
ctx_.evaluation_context.labels = NamesToLabels(plan->ast_storage().labels_, shard_request_manager); ctx_.evaluation_context.labels = NamesToLabels(plan->ast_storage().labels_, request_router);
if (interpreter_context->config.execution_timeout_sec > 0) { if (interpreter_context->config.execution_timeout_sec > 0) {
ctx_.timer = utils::AsyncTimer{interpreter_context->config.execution_timeout_sec}; ctx_.timer = utils::AsyncTimer{interpreter_context->config.execution_timeout_sec};
} }
ctx_.is_shutting_down = &interpreter_context->is_shutting_down; ctx_.is_shutting_down = &interpreter_context->is_shutting_down;
ctx_.is_profile_query = is_profile_query; ctx_.is_profile_query = is_profile_query;
ctx_.shard_request_manager = shard_request_manager; ctx_.request_router = request_router;
ctx_.edge_ids_alloc = &interpreter_context->edge_ids_alloc; ctx_.edge_ids_alloc = &interpreter_context->edge_ids_alloc;
} }
@ -804,7 +804,7 @@ Interpreter::Interpreter(InterpreterContext *interpreter_context) : interpreter_
auto random_uuid = boost::uuids::uuid{boost::uuids::random_generator()()}; auto random_uuid = boost::uuids::uuid{boost::uuids::random_generator()()};
auto query_io = interpreter_context_->io.ForkLocal(random_uuid); auto query_io = interpreter_context_->io.ForkLocal(random_uuid);
shard_request_manager_ = std::make_unique<ShardRequestManager<io::local_transport::LocalTransport>>( request_router_ = std::make_unique<RequestRouter<io::local_transport::LocalTransport>>(
coordinator::CoordinatorClient<io::local_transport::LocalTransport>( coordinator::CoordinatorClient<io::local_transport::LocalTransport>(
query_io, interpreter_context_->coordinator_address, std::vector{interpreter_context_->coordinator_address}), query_io, interpreter_context_->coordinator_address, std::vector{interpreter_context_->coordinator_address}),
std::move(query_io)); std::move(query_io));
@ -881,7 +881,7 @@ PreparedQuery Interpreter::PrepareTransactionQuery(std::string_view query_upper)
PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::map<std::string, TypedValue> *summary, PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::map<std::string, TypedValue> *summary,
InterpreterContext *interpreter_context, DbAccessor *dba, InterpreterContext *interpreter_context, DbAccessor *dba,
utils::MemoryResource *execution_memory, std::vector<Notification> *notifications, utils::MemoryResource *execution_memory, std::vector<Notification> *notifications,
ShardRequestManagerInterface *shard_request_manager) { RequestRouterInterface *request_router) {
// TriggerContextCollector *trigger_context_collector = nullptr) { // TriggerContextCollector *trigger_context_collector = nullptr) {
auto *cypher_query = utils::Downcast<CypherQuery>(parsed_query.query); auto *cypher_query = utils::Downcast<CypherQuery>(parsed_query.query);
@ -890,8 +890,7 @@ PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::map<std::string,
EvaluationContext evaluation_context; EvaluationContext evaluation_context;
evaluation_context.timestamp = QueryTimestamp(); evaluation_context.timestamp = QueryTimestamp();
evaluation_context.parameters = parsed_query.parameters; evaluation_context.parameters = parsed_query.parameters;
ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, shard_request_manager, ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, request_router, storage::v3::View::OLD);
storage::v3::View::OLD);
const auto memory_limit = const auto memory_limit =
expr::EvaluateMemoryLimit(&evaluator, cypher_query->memory_limit_, cypher_query->memory_scale_); expr::EvaluateMemoryLimit(&evaluator, cypher_query->memory_limit_, cypher_query->memory_scale_);
if (memory_limit) { if (memory_limit) {
@ -906,9 +905,9 @@ PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::map<std::string,
"convert the parsed row values to the appropriate type. This can be done using the built-in " "convert the parsed row values to the appropriate type. This can be done using the built-in "
"conversion functions such as ToInteger, ToFloat, ToBoolean etc."); "conversion functions such as ToInteger, ToFloat, ToBoolean etc.");
} }
auto plan = CypherQueryToPlan( auto plan = CypherQueryToPlan(parsed_query.stripped_query.hash(), std::move(parsed_query.ast_storage), cypher_query,
parsed_query.stripped_query.hash(), std::move(parsed_query.ast_storage), cypher_query, parsed_query.parameters, parsed_query.parameters,
parsed_query.is_cacheable ? &interpreter_context->plan_cache : nullptr, shard_request_manager); parsed_query.is_cacheable ? &interpreter_context->plan_cache : nullptr, request_router);
summary->insert_or_assign("cost_estimate", plan->cost()); summary->insert_or_assign("cost_estimate", plan->cost());
auto rw_type_checker = plan::ReadWriteTypeChecker(); auto rw_type_checker = plan::ReadWriteTypeChecker();
@ -927,7 +926,7 @@ PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::map<std::string,
utils::FindOr(parsed_query.stripped_query.named_expressions(), symbol.token_position(), symbol.name()).first); utils::FindOr(parsed_query.stripped_query.named_expressions(), symbol.token_position(), symbol.name()).first);
} }
auto pull_plan = std::make_shared<PullPlan>(plan, parsed_query.parameters, false, dba, interpreter_context, auto pull_plan = std::make_shared<PullPlan>(plan, parsed_query.parameters, false, dba, interpreter_context,
execution_memory, shard_request_manager, memory_limit); execution_memory, request_router, memory_limit);
// execution_memory, trigger_context_collector, memory_limit); // execution_memory, trigger_context_collector, memory_limit);
return PreparedQuery{std::move(header), std::move(parsed_query.required_privileges), return PreparedQuery{std::move(header), std::move(parsed_query.required_privileges),
[pull_plan = std::move(pull_plan), output_symbols = std::move(output_symbols), summary]( [pull_plan = std::move(pull_plan), output_symbols = std::move(output_symbols), summary](
@ -941,8 +940,7 @@ PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::map<std::string,
} }
PreparedQuery PrepareExplainQuery(ParsedQuery parsed_query, std::map<std::string, TypedValue> *summary, PreparedQuery PrepareExplainQuery(ParsedQuery parsed_query, std::map<std::string, TypedValue> *summary,
InterpreterContext *interpreter_context, InterpreterContext *interpreter_context, RequestRouterInterface *request_router,
ShardRequestManagerInterface *shard_request_manager,
utils::MemoryResource *execution_memory) { utils::MemoryResource *execution_memory) {
const std::string kExplainQueryStart = "explain "; const std::string kExplainQueryStart = "explain ";
MG_ASSERT(utils::StartsWith(utils::ToLowerCase(parsed_query.stripped_query.query()), kExplainQueryStart), MG_ASSERT(utils::StartsWith(utils::ToLowerCase(parsed_query.stripped_query.query()), kExplainQueryStart),
@ -961,20 +959,20 @@ PreparedQuery PrepareExplainQuery(ParsedQuery parsed_query, std::map<std::string
auto *cypher_query = utils::Downcast<CypherQuery>(parsed_inner_query.query); auto *cypher_query = utils::Downcast<CypherQuery>(parsed_inner_query.query);
MG_ASSERT(cypher_query, "Cypher grammar should not allow other queries in EXPLAIN"); MG_ASSERT(cypher_query, "Cypher grammar should not allow other queries in EXPLAIN");
auto cypher_query_plan = CypherQueryToPlan( auto cypher_query_plan =
parsed_inner_query.stripped_query.hash(), std::move(parsed_inner_query.ast_storage), cypher_query, CypherQueryToPlan(parsed_inner_query.stripped_query.hash(), std::move(parsed_inner_query.ast_storage),
parsed_inner_query.parameters, parsed_inner_query.is_cacheable ? &interpreter_context->plan_cache : nullptr, cypher_query, parsed_inner_query.parameters,
shard_request_manager); parsed_inner_query.is_cacheable ? &interpreter_context->plan_cache : nullptr, request_router);
std::stringstream printed_plan; std::stringstream printed_plan;
plan::PrettyPrint(*shard_request_manager, &cypher_query_plan->plan(), &printed_plan); plan::PrettyPrint(*request_router, &cypher_query_plan->plan(), &printed_plan);
std::vector<std::vector<TypedValue>> printed_plan_rows; std::vector<std::vector<TypedValue>> printed_plan_rows;
for (const auto &row : utils::Split(utils::RTrim(printed_plan.str()), "\n")) { for (const auto &row : utils::Split(utils::RTrim(printed_plan.str()), "\n")) {
printed_plan_rows.push_back(std::vector<TypedValue>{TypedValue(row)}); printed_plan_rows.push_back(std::vector<TypedValue>{TypedValue(row)});
} }
summary->insert_or_assign("explain", plan::PlanToJson(*shard_request_manager, &cypher_query_plan->plan()).dump()); summary->insert_or_assign("explain", plan::PlanToJson(*request_router, &cypher_query_plan->plan()).dump());
return PreparedQuery{{"QUERY PLAN"}, return PreparedQuery{{"QUERY PLAN"},
std::move(parsed_query.required_privileges), std::move(parsed_query.required_privileges),
@ -991,7 +989,7 @@ PreparedQuery PrepareExplainQuery(ParsedQuery parsed_query, std::map<std::string
PreparedQuery PrepareProfileQuery(ParsedQuery parsed_query, bool in_explicit_transaction, PreparedQuery PrepareProfileQuery(ParsedQuery parsed_query, bool in_explicit_transaction,
std::map<std::string, TypedValue> *summary, InterpreterContext *interpreter_context, std::map<std::string, TypedValue> *summary, InterpreterContext *interpreter_context,
DbAccessor *dba, utils::MemoryResource *execution_memory, DbAccessor *dba, utils::MemoryResource *execution_memory,
ShardRequestManagerInterface *shard_request_manager = nullptr) { RequestRouterInterface *request_router = nullptr) {
const std::string kProfileQueryStart = "profile "; const std::string kProfileQueryStart = "profile ";
MG_ASSERT(utils::StartsWith(utils::ToLowerCase(parsed_query.stripped_query.query()), kProfileQueryStart), MG_ASSERT(utils::StartsWith(utils::ToLowerCase(parsed_query.stripped_query.query()), kProfileQueryStart),
@ -1034,22 +1032,21 @@ PreparedQuery PrepareProfileQuery(ParsedQuery parsed_query, bool in_explicit_tra
EvaluationContext evaluation_context; EvaluationContext evaluation_context;
evaluation_context.timestamp = QueryTimestamp(); evaluation_context.timestamp = QueryTimestamp();
evaluation_context.parameters = parsed_inner_query.parameters; evaluation_context.parameters = parsed_inner_query.parameters;
ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, shard_request_manager, ExpressionEvaluator evaluator(&frame, symbol_table, evaluation_context, request_router, storage::v3::View::OLD);
storage::v3::View::OLD);
const auto memory_limit = const auto memory_limit =
expr::EvaluateMemoryLimit(&evaluator, cypher_query->memory_limit_, cypher_query->memory_scale_); expr::EvaluateMemoryLimit(&evaluator, cypher_query->memory_limit_, cypher_query->memory_scale_);
auto cypher_query_plan = CypherQueryToPlan( auto cypher_query_plan =
parsed_inner_query.stripped_query.hash(), std::move(parsed_inner_query.ast_storage), cypher_query, CypherQueryToPlan(parsed_inner_query.stripped_query.hash(), std::move(parsed_inner_query.ast_storage),
parsed_inner_query.parameters, parsed_inner_query.is_cacheable ? &interpreter_context->plan_cache : nullptr, cypher_query, parsed_inner_query.parameters,
shard_request_manager); parsed_inner_query.is_cacheable ? &interpreter_context->plan_cache : nullptr, request_router);
auto rw_type_checker = plan::ReadWriteTypeChecker(); auto rw_type_checker = plan::ReadWriteTypeChecker();
rw_type_checker.InferRWType(const_cast<plan::LogicalOperator &>(cypher_query_plan->plan())); rw_type_checker.InferRWType(const_cast<plan::LogicalOperator &>(cypher_query_plan->plan()));
return PreparedQuery{{"OPERATOR", "ACTUAL HITS", "RELATIVE TIME", "ABSOLUTE TIME", "CUSTOM DATA"}, return PreparedQuery{{"OPERATOR", "ACTUAL HITS", "RELATIVE TIME", "ABSOLUTE TIME", "CUSTOM DATA"},
std::move(parsed_query.required_privileges), std::move(parsed_query.required_privileges),
[plan = std::move(cypher_query_plan), parameters = std::move(parsed_inner_query.parameters), [plan = std::move(cypher_query_plan), parameters = std::move(parsed_inner_query.parameters),
summary, dba, interpreter_context, execution_memory, memory_limit, shard_request_manager, summary, dba, interpreter_context, execution_memory, memory_limit, request_router,
// We want to execute the query we are profiling lazily, so we delay // We want to execute the query we are profiling lazily, so we delay
// the construction of the corresponding context. // the construction of the corresponding context.
stats_and_total_time = std::optional<plan::ProfilingStatsWithTotalTime>{}, stats_and_total_time = std::optional<plan::ProfilingStatsWithTotalTime>{},
@ -1058,7 +1055,7 @@ PreparedQuery PrepareProfileQuery(ParsedQuery parsed_query, bool in_explicit_tra
// No output symbols are given so that nothing is streamed. // No output symbols are given so that nothing is streamed.
if (!stats_and_total_time) { if (!stats_and_total_time) {
stats_and_total_time = PullPlan(plan, parameters, true, dba, interpreter_context, stats_and_total_time = PullPlan(plan, parameters, true, dba, interpreter_context,
execution_memory, shard_request_manager, memory_limit) execution_memory, request_router, memory_limit)
.Pull(stream, {}, {}, summary); .Pull(stream, {}, {}, summary);
pull_plan = std::make_shared<PullPlanVector>(ProfilingStatsToTable(*stats_and_total_time)); pull_plan = std::make_shared<PullPlanVector>(ProfilingStatsToTable(*stats_and_total_time));
} }
@ -1185,7 +1182,7 @@ PreparedQuery PrepareIndexQuery(ParsedQuery parsed_query, bool in_explicit_trans
PreparedQuery PrepareAuthQuery(ParsedQuery parsed_query, bool in_explicit_transaction, PreparedQuery PrepareAuthQuery(ParsedQuery parsed_query, bool in_explicit_transaction,
std::map<std::string, TypedValue> *summary, InterpreterContext *interpreter_context, std::map<std::string, TypedValue> *summary, InterpreterContext *interpreter_context,
DbAccessor *dba, utils::MemoryResource *execution_memory, DbAccessor *dba, utils::MemoryResource *execution_memory,
ShardRequestManagerInterface *manager) { RequestRouterInterface *manager) {
if (in_explicit_transaction) { if (in_explicit_transaction) {
throw UserModificationInMulticommandTxException(); throw UserModificationInMulticommandTxException();
} }
@ -1221,7 +1218,7 @@ PreparedQuery PrepareAuthQuery(ParsedQuery parsed_query, bool in_explicit_transa
PreparedQuery PrepareReplicationQuery(ParsedQuery parsed_query, const bool in_explicit_transaction, PreparedQuery PrepareReplicationQuery(ParsedQuery parsed_query, const bool in_explicit_transaction,
std::vector<Notification> *notifications, InterpreterContext *interpreter_context, std::vector<Notification> *notifications, InterpreterContext *interpreter_context,
ShardRequestManagerInterface *manager) { RequestRouterInterface *manager) {
if (in_explicit_transaction) { if (in_explicit_transaction) {
throw ReplicationModificationInMulticommandTxException(); throw ReplicationModificationInMulticommandTxException();
} }
@ -1317,7 +1314,7 @@ PreparedQuery PrepareCreateSnapshotQuery(ParsedQuery parsed_query, bool in_expli
} }
PreparedQuery PrepareSettingQuery(ParsedQuery parsed_query, const bool in_explicit_transaction, PreparedQuery PrepareSettingQuery(ParsedQuery parsed_query, const bool in_explicit_transaction,
ShardRequestManagerInterface *manager) { RequestRouterInterface *manager) {
if (in_explicit_transaction) { if (in_explicit_transaction) {
throw SettingConfigInMulticommandTxException{}; throw SettingConfigInMulticommandTxException{};
} }
@ -1521,7 +1518,7 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
if (!in_explicit_transaction_ && if (!in_explicit_transaction_ &&
(utils::Downcast<CypherQuery>(parsed_query.query) || utils::Downcast<ExplainQuery>(parsed_query.query) || (utils::Downcast<CypherQuery>(parsed_query.query) || utils::Downcast<ExplainQuery>(parsed_query.query) ||
utils::Downcast<ProfileQuery>(parsed_query.query))) { utils::Downcast<ProfileQuery>(parsed_query.query))) {
shard_request_manager_->StartTransaction(); request_router_->StartTransaction();
} }
utils::Timer planning_timer; utils::Timer planning_timer;
@ -1530,14 +1527,14 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
if (utils::Downcast<CypherQuery>(parsed_query.query)) { if (utils::Downcast<CypherQuery>(parsed_query.query)) {
prepared_query = PrepareCypherQuery(std::move(parsed_query), &query_execution->summary, interpreter_context_, prepared_query = PrepareCypherQuery(std::move(parsed_query), &query_execution->summary, interpreter_context_,
&*execution_db_accessor_, &query_execution->execution_memory, &*execution_db_accessor_, &query_execution->execution_memory,
&query_execution->notifications, shard_request_manager_.get()); &query_execution->notifications, request_router_.get());
} else if (utils::Downcast<ExplainQuery>(parsed_query.query)) { } else if (utils::Downcast<ExplainQuery>(parsed_query.query)) {
prepared_query = PrepareExplainQuery(std::move(parsed_query), &query_execution->summary, interpreter_context_, prepared_query = PrepareExplainQuery(std::move(parsed_query), &query_execution->summary, interpreter_context_,
&*shard_request_manager_, &query_execution->execution_memory_with_exception); &*request_router_, &query_execution->execution_memory_with_exception);
} else if (utils::Downcast<ProfileQuery>(parsed_query.query)) { } else if (utils::Downcast<ProfileQuery>(parsed_query.query)) {
prepared_query = PrepareProfileQuery( prepared_query = PrepareProfileQuery(std::move(parsed_query), in_explicit_transaction_, &query_execution->summary,
std::move(parsed_query), in_explicit_transaction_, &query_execution->summary, interpreter_context_, interpreter_context_, &*execution_db_accessor_,
&*execution_db_accessor_, &query_execution->execution_memory_with_exception, shard_request_manager_.get()); &query_execution->execution_memory_with_exception, request_router_.get());
} else if (utils::Downcast<DumpQuery>(parsed_query.query)) { } else if (utils::Downcast<DumpQuery>(parsed_query.query)) {
prepared_query = PrepareDumpQuery(std::move(parsed_query), &query_execution->summary, &*execution_db_accessor_, prepared_query = PrepareDumpQuery(std::move(parsed_query), &query_execution->summary, &*execution_db_accessor_,
&query_execution->execution_memory); &query_execution->execution_memory);
@ -1545,9 +1542,9 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
prepared_query = PrepareIndexQuery(std::move(parsed_query), in_explicit_transaction_, prepared_query = PrepareIndexQuery(std::move(parsed_query), in_explicit_transaction_,
&query_execution->notifications, interpreter_context_); &query_execution->notifications, interpreter_context_);
} else if (utils::Downcast<AuthQuery>(parsed_query.query)) { } else if (utils::Downcast<AuthQuery>(parsed_query.query)) {
prepared_query = PrepareAuthQuery( prepared_query = PrepareAuthQuery(std::move(parsed_query), in_explicit_transaction_, &query_execution->summary,
std::move(parsed_query), in_explicit_transaction_, &query_execution->summary, interpreter_context_, interpreter_context_, &*execution_db_accessor_,
&*execution_db_accessor_, &query_execution->execution_memory_with_exception, shard_request_manager_.get()); &query_execution->execution_memory_with_exception, request_router_.get());
} else if (utils::Downcast<InfoQuery>(parsed_query.query)) { } else if (utils::Downcast<InfoQuery>(parsed_query.query)) {
prepared_query = PrepareInfoQuery(std::move(parsed_query), in_explicit_transaction_, &query_execution->summary, prepared_query = PrepareInfoQuery(std::move(parsed_query), in_explicit_transaction_, &query_execution->summary,
interpreter_context_, interpreter_context_->db, interpreter_context_, interpreter_context_->db,
@ -1558,7 +1555,7 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
} else if (utils::Downcast<ReplicationQuery>(parsed_query.query)) { } else if (utils::Downcast<ReplicationQuery>(parsed_query.query)) {
prepared_query = prepared_query =
PrepareReplicationQuery(std::move(parsed_query), in_explicit_transaction_, &query_execution->notifications, PrepareReplicationQuery(std::move(parsed_query), in_explicit_transaction_, &query_execution->notifications,
interpreter_context_, shard_request_manager_.get()); interpreter_context_, request_router_.get());
} else if (utils::Downcast<LockPathQuery>(parsed_query.query)) { } else if (utils::Downcast<LockPathQuery>(parsed_query.query)) {
prepared_query = PrepareLockPathQuery(std::move(parsed_query), in_explicit_transaction_, interpreter_context_, prepared_query = PrepareLockPathQuery(std::move(parsed_query), in_explicit_transaction_, interpreter_context_,
&*execution_db_accessor_); &*execution_db_accessor_);
@ -1575,8 +1572,7 @@ Interpreter::PrepareResult Interpreter::Prepare(const std::string &query_string,
prepared_query = prepared_query =
PrepareCreateSnapshotQuery(std::move(parsed_query), in_explicit_transaction_, interpreter_context_); PrepareCreateSnapshotQuery(std::move(parsed_query), in_explicit_transaction_, interpreter_context_);
} else if (utils::Downcast<SettingQuery>(parsed_query.query)) { } else if (utils::Downcast<SettingQuery>(parsed_query.query)) {
prepared_query = prepared_query = PrepareSettingQuery(std::move(parsed_query), in_explicit_transaction_, request_router_.get());
PrepareSettingQuery(std::move(parsed_query), in_explicit_transaction_, shard_request_manager_.get());
} else if (utils::Downcast<VersionQuery>(parsed_query.query)) { } else if (utils::Downcast<VersionQuery>(parsed_query.query)) {
prepared_query = PrepareVersionQuery(std::move(parsed_query), in_explicit_transaction_); prepared_query = PrepareVersionQuery(std::move(parsed_query), in_explicit_transaction_);
} else if (utils::Downcast<SchemaQuery>(parsed_query.query)) { } else if (utils::Downcast<SchemaQuery>(parsed_query.query)) {
@ -1617,7 +1613,7 @@ void Interpreter::Commit() {
// For now, we will not check if there are some unfinished queries. // For now, we will not check if there are some unfinished queries.
// We should document clearly that all results should be pulled to complete // We should document clearly that all results should be pulled to complete
// a query. // a query.
shard_request_manager_->Commit(); request_router_->Commit();
if (!db_accessor_) return; if (!db_accessor_) return;
const auto reset_necessary_members = [this]() { const auto reset_necessary_members = [this]() {

View File

@ -296,7 +296,7 @@ class Interpreter final {
*/ */
void Abort(); void Abort();
const ShardRequestManagerInterface *GetShardRequestManager() const { return shard_request_manager_.get(); } const RequestRouterInterface *GetRequestRouter() const { return request_router_.get(); }
private: private:
struct QueryExecution { struct QueryExecution {
@ -342,7 +342,7 @@ class Interpreter final {
// move this unique_ptr into a shared_ptr. // move this unique_ptr into a shared_ptr.
std::unique_ptr<storage::v3::Shard::Accessor> db_accessor_; std::unique_ptr<storage::v3::Shard::Accessor> db_accessor_;
std::optional<DbAccessor> execution_db_accessor_; std::optional<DbAccessor> execution_db_accessor_;
std::unique_ptr<ShardRequestManagerInterface> shard_request_manager_; std::unique_ptr<RequestRouterInterface> request_router_;
bool in_explicit_transaction_{false}; bool in_explicit_transaction_{false};
bool expect_rollback_{false}; bool expect_rollback_{false};

View File

@ -39,8 +39,8 @@
#include "query/v2/frontend/ast/ast.hpp" #include "query/v2/frontend/ast/ast.hpp"
#include "query/v2/path.hpp" #include "query/v2/path.hpp"
#include "query/v2/plan/scoped_profile.hpp" #include "query/v2/plan/scoped_profile.hpp"
#include "query/v2/request_router.hpp"
#include "query/v2/requests.hpp" #include "query/v2/requests.hpp"
#include "query/v2/shard_request_manager.hpp"
#include "storage/v3/conversions.hpp" #include "storage/v3/conversions.hpp"
#include "storage/v3/property_value.hpp" #include "storage/v3/property_value.hpp"
#include "utils/algorithm.hpp" #include "utils/algorithm.hpp"
@ -177,7 +177,7 @@ class DistributedCreateNodeCursor : public Cursor {
bool Pull(Frame &frame, ExecutionContext &context) override { bool Pull(Frame &frame, ExecutionContext &context) override {
SCOPED_PROFILE_OP("CreateNode"); SCOPED_PROFILE_OP("CreateNode");
if (input_cursor_->Pull(frame, context)) { if (input_cursor_->Pull(frame, context)) {
auto &shard_manager = context.shard_request_manager; auto &shard_manager = context.request_router;
{ {
SCOPED_REQUEST_WAIT_PROFILE; SCOPED_REQUEST_WAIT_PROFILE;
shard_manager->Request(state_, NodeCreationInfoToRequest(context, frame)); shard_manager->Request(state_, NodeCreationInfoToRequest(context, frame));
@ -197,8 +197,8 @@ class DistributedCreateNodeCursor : public Cursor {
// TODO(kostasrim) Make this work with batching // TODO(kostasrim) Make this work with batching
const auto primary_label = msgs::Label{.id = nodes_info_[0]->labels[0]}; const auto primary_label = msgs::Label{.id = nodes_info_[0]->labels[0]};
msgs::Vertex v{.id = std::make_pair(primary_label, primary_keys_[0])}; msgs::Vertex v{.id = std::make_pair(primary_label, primary_keys_[0])};
frame[nodes_info_.front()->symbol] = TypedValue( frame[nodes_info_.front()->symbol] =
query::v2::accessors::VertexAccessor(std::move(v), src_vertex_props_[0], context.shard_request_manager)); TypedValue(query::v2::accessors::VertexAccessor(std::move(v), src_vertex_props_[0], context.request_router));
} }
std::vector<msgs::NewVertex> NodeCreationInfoToRequest(ExecutionContext &context, Frame &frame) { std::vector<msgs::NewVertex> NodeCreationInfoToRequest(ExecutionContext &context, Frame &frame) {
@ -218,7 +218,7 @@ class DistributedCreateNodeCursor : public Cursor {
if (const auto *node_info_properties = std::get_if<PropertiesMapList>(&node_info->properties)) { if (const auto *node_info_properties = std::get_if<PropertiesMapList>(&node_info->properties)) {
for (const auto &[key, value_expression] : *node_info_properties) { for (const auto &[key, value_expression] : *node_info_properties) {
TypedValue val = value_expression->Accept(evaluator); TypedValue val = value_expression->Accept(evaluator);
if (context.shard_request_manager->IsPrimaryKey(primary_label, key)) { if (context.request_router->IsPrimaryKey(primary_label, key)) {
rqst.primary_key.push_back(TypedValueToValue(val)); rqst.primary_key.push_back(TypedValueToValue(val));
pk.push_back(TypedValueToValue(val)); pk.push_back(TypedValueToValue(val));
} }
@ -227,8 +227,8 @@ class DistributedCreateNodeCursor : public Cursor {
auto property_map = evaluator.Visit(*std::get<ParameterLookup *>(node_info->properties)).ValueMap(); auto property_map = evaluator.Visit(*std::get<ParameterLookup *>(node_info->properties)).ValueMap();
for (const auto &[key, value] : property_map) { for (const auto &[key, value] : property_map) {
auto key_str = std::string(key); auto key_str = std::string(key);
auto property_id = context.shard_request_manager->NameToProperty(key_str); auto property_id = context.request_router->NameToProperty(key_str);
if (context.shard_request_manager->IsPrimaryKey(primary_label, property_id)) { if (context.request_router->IsPrimaryKey(primary_label, property_id)) {
rqst.primary_key.push_back(TypedValueToValue(value)); rqst.primary_key.push_back(TypedValueToValue(value));
pk.push_back(TypedValueToValue(value)); pk.push_back(TypedValueToValue(value));
} }
@ -386,7 +386,7 @@ class DistributedScanAllAndFilterCursor : public Cursor {
using VertexAccessor = accessors::VertexAccessor; using VertexAccessor = accessors::VertexAccessor;
bool MakeRequest(ShardRequestManagerInterface &shard_manager, ExecutionContext &context) { bool MakeRequest(RequestRouterInterface &shard_manager, ExecutionContext &context) {
{ {
SCOPED_REQUEST_WAIT_PROFILE; SCOPED_REQUEST_WAIT_PROFILE;
current_batch = shard_manager.Request(request_state_); current_batch = shard_manager.Request(request_state_);
@ -398,7 +398,7 @@ class DistributedScanAllAndFilterCursor : public Cursor {
bool Pull(Frame &frame, ExecutionContext &context) override { bool Pull(Frame &frame, ExecutionContext &context) override {
SCOPED_PROFILE_OP(op_name_); SCOPED_PROFILE_OP(op_name_);
auto &shard_manager = *context.shard_request_manager; auto &shard_manager = *context.request_router;
while (true) { while (true) {
if (MustAbort(context)) { if (MustAbort(context)) {
throw HintedAbortError(); throw HintedAbortError();
@ -696,7 +696,7 @@ bool Filter::FilterCursor::Pull(Frame &frame, ExecutionContext &context) {
// Like all filters, newly set values should not affect filtering of old // Like all filters, newly set values should not affect filtering of old
// nodes and edges. // nodes and edges.
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.shard_request_manager, ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router,
storage::v3::View::OLD); storage::v3::View::OLD);
while (input_cursor_->Pull(frame, context)) { while (input_cursor_->Pull(frame, context)) {
if (EvaluateFilter(evaluator, self_.expression_)) return true; if (EvaluateFilter(evaluator, self_.expression_)) return true;
@ -737,8 +737,8 @@ bool Produce::ProduceCursor::Pull(Frame &frame, ExecutionContext &context) {
if (input_cursor_->Pull(frame, context)) { if (input_cursor_->Pull(frame, context)) {
// Produce should always yield the latest results. // Produce should always yield the latest results.
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router,
context.shard_request_manager, storage::v3::View::NEW); storage::v3::View::NEW);
for (auto named_expr : self_.named_expressions_) named_expr->Accept(evaluator); for (auto named_expr : self_.named_expressions_) named_expr->Accept(evaluator);
return true; return true;
@ -1122,8 +1122,8 @@ class AggregateCursor : public Cursor {
* aggregation results, and not on the number of inputs. * aggregation results, and not on the number of inputs.
*/ */
void ProcessAll(Frame *frame, ExecutionContext *context) { void ProcessAll(Frame *frame, ExecutionContext *context) {
ExpressionEvaluator evaluator(frame, context->symbol_table, context->evaluation_context, ExpressionEvaluator evaluator(frame, context->symbol_table, context->evaluation_context, context->request_router,
context->shard_request_manager, storage::v3::View::NEW); storage::v3::View::NEW);
while (input_cursor_->Pull(*frame, *context)) { while (input_cursor_->Pull(*frame, *context)) {
ProcessOne(*frame, &evaluator); ProcessOne(*frame, &evaluator);
} }
@ -1343,8 +1343,8 @@ bool Skip::SkipCursor::Pull(Frame &frame, ExecutionContext &context) {
// First successful pull from the input, evaluate the skip expression. // First successful pull from the input, evaluate the skip expression.
// The skip expression doesn't contain identifiers so graph view // The skip expression doesn't contain identifiers so graph view
// parameter is not important. // parameter is not important.
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router,
context.shard_request_manager, storage::v3::View::OLD); storage::v3::View::OLD);
TypedValue to_skip = self_.expression_->Accept(evaluator); TypedValue to_skip = self_.expression_->Accept(evaluator);
if (to_skip.type() != TypedValue::Type::Int) if (to_skip.type() != TypedValue::Type::Int)
throw QueryRuntimeException("Number of elements to skip must be an integer."); throw QueryRuntimeException("Number of elements to skip must be an integer.");
@ -1398,8 +1398,8 @@ bool Limit::LimitCursor::Pull(Frame &frame, ExecutionContext &context) {
if (limit_ == -1) { if (limit_ == -1) {
// Limit expression doesn't contain identifiers so graph view is not // Limit expression doesn't contain identifiers so graph view is not
// important. // important.
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router,
context.shard_request_manager, storage::v3::View::OLD); storage::v3::View::OLD);
TypedValue limit = self_.expression_->Accept(evaluator); TypedValue limit = self_.expression_->Accept(evaluator);
if (limit.type() != TypedValue::Type::Int) if (limit.type() != TypedValue::Type::Int)
throw QueryRuntimeException("Limit on number of returned elements must be an integer."); throw QueryRuntimeException("Limit on number of returned elements must be an integer.");
@ -1454,8 +1454,8 @@ class OrderByCursor : public Cursor {
SCOPED_PROFILE_OP("OrderBy"); SCOPED_PROFILE_OP("OrderBy");
if (!did_pull_all_) { if (!did_pull_all_) {
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router,
context.shard_request_manager, storage::v3::View::OLD); storage::v3::View::OLD);
auto *mem = cache_.get_allocator().GetMemoryResource(); auto *mem = cache_.get_allocator().GetMemoryResource();
while (input_cursor_->Pull(frame, context)) { while (input_cursor_->Pull(frame, context)) {
// collect the order_by elements // collect the order_by elements
@ -1712,8 +1712,8 @@ class UnwindCursor : public Cursor {
if (!input_cursor_->Pull(frame, context)) return false; if (!input_cursor_->Pull(frame, context)) return false;
// successful pull from input, initialize value and iterator // successful pull from input, initialize value and iterator
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router,
context.shard_request_manager, storage::v3::View::OLD); storage::v3::View::OLD);
TypedValue input_value = self_.input_expression_->Accept(evaluator); TypedValue input_value = self_.input_expression_->Accept(evaluator);
if (input_value.type() != TypedValue::Type::List) if (input_value.type() != TypedValue::Type::List)
throw QueryRuntimeException("Argument of UNWIND must be a list, but '{}' was provided.", input_value.type()); throw QueryRuntimeException("Argument of UNWIND must be a list, but '{}' was provided.", input_value.type());
@ -2224,7 +2224,7 @@ class LoadCsvCursor : public Cursor {
Frame frame(0); Frame frame(0);
SymbolTable symbol_table; SymbolTable symbol_table;
auto evaluator = auto evaluator =
ExpressionEvaluator(&frame, symbol_table, eval_context, context.shard_request_manager, storage::v3::View::OLD); ExpressionEvaluator(&frame, symbol_table, eval_context, context.request_router, storage::v3::View::OLD);
auto maybe_file = ToOptionalString(&evaluator, self_->file_); auto maybe_file = ToOptionalString(&evaluator, self_->file_);
auto maybe_delim = ToOptionalString(&evaluator, self_->delimiter_); auto maybe_delim = ToOptionalString(&evaluator, self_->delimiter_);
@ -2261,8 +2261,8 @@ class ForeachCursor : public Cursor {
return false; return false;
} }
ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, ExpressionEvaluator evaluator(&frame, context.symbol_table, context.evaluation_context, context.request_router,
context.shard_request_manager, storage::v3::View::NEW); storage::v3::View::NEW);
TypedValue expr_result = expression->Accept(evaluator); TypedValue expr_result = expression->Accept(evaluator);
if (expr_result.IsNull()) { if (expr_result.IsNull()) {
@ -2338,7 +2338,7 @@ class DistributedCreateExpandCursor : public Cursor {
if (!input_cursor_->Pull(frame, context)) { if (!input_cursor_->Pull(frame, context)) {
return false; return false;
} }
auto &shard_manager = context.shard_request_manager; auto &shard_manager = context.request_router;
ResetExecutionState(); ResetExecutionState();
{ {
SCOPED_REQUEST_WAIT_PROFILE; SCOPED_REQUEST_WAIT_PROFILE;
@ -2379,7 +2379,7 @@ class DistributedCreateExpandCursor : public Cursor {
// handle parameter // handle parameter
auto property_map = evaluator.Visit(*std::get<ParameterLookup *>(edge_info.properties)).ValueMap(); auto property_map = evaluator.Visit(*std::get<ParameterLookup *>(edge_info.properties)).ValueMap();
for (const auto &[property, value] : property_map) { for (const auto &[property, value] : property_map) {
const auto property_id = context.shard_request_manager->NameToProperty(std::string(property)); const auto property_id = context.request_router->NameToProperty(std::string(property));
request.properties.emplace_back(property_id, storage::v3::TypedValueToValue(value)); request.properties.emplace_back(property_id, storage::v3::TypedValueToValue(value));
} }
} }
@ -2394,7 +2394,7 @@ class DistributedCreateExpandCursor : public Cursor {
const auto set_vertex = [&context](const auto &vertex, auto &vertex_id) { const auto set_vertex = [&context](const auto &vertex, auto &vertex_id) {
vertex_id.first = vertex.PrimaryLabel(); vertex_id.first = vertex.PrimaryLabel();
for (const auto &[key, val] : vertex.Properties()) { for (const auto &[key, val] : vertex.Properties()) {
if (context.shard_request_manager->IsPrimaryKey(vertex_id.first.id, key)) { if (context.request_router->IsPrimaryKey(vertex_id.first.id, key)) {
vertex_id.second.push_back(val); vertex_id.second.push_back(val);
} }
} }
@ -2474,11 +2474,11 @@ class DistributedExpandCursor : public Cursor {
request.src_vertices.push_back(get_dst_vertex(edge, direction)); request.src_vertices.push_back(get_dst_vertex(edge, direction));
request.direction = (direction == EdgeAtom::Direction::IN) ? msgs::EdgeDirection::OUT : msgs::EdgeDirection::IN; request.direction = (direction == EdgeAtom::Direction::IN) ? msgs::EdgeDirection::OUT : msgs::EdgeDirection::IN;
ExecutionState<msgs::ExpandOneRequest> request_state; ExecutionState<msgs::ExpandOneRequest> request_state;
auto result_rows = context.shard_request_manager->Request(request_state, std::move(request)); auto result_rows = context.request_router->Request(request_state, std::move(request));
MG_ASSERT(result_rows.size() == 1); MG_ASSERT(result_rows.size() == 1);
auto &result_row = result_rows.front(); auto &result_row = result_rows.front();
frame[self_.common_.node_symbol] = accessors::VertexAccessor( frame[self_.common_.node_symbol] = accessors::VertexAccessor(
msgs::Vertex{result_row.src_vertex}, result_row.src_vertex_properties, context.shard_request_manager); msgs::Vertex{result_row.src_vertex}, result_row.src_vertex_properties, context.request_router);
} }
bool InitEdges(Frame &frame, ExecutionContext &context) { bool InitEdges(Frame &frame, ExecutionContext &context) {
@ -2502,7 +2502,7 @@ class DistributedExpandCursor : public Cursor {
ExecutionState<msgs::ExpandOneRequest> request_state; ExecutionState<msgs::ExpandOneRequest> request_state;
auto result_rows = std::invoke([&context, &request_state, &request]() mutable { auto result_rows = std::invoke([&context, &request_state, &request]() mutable {
SCOPED_REQUEST_WAIT_PROFILE; SCOPED_REQUEST_WAIT_PROFILE;
return context.shard_request_manager->Request(request_state, std::move(request)); return context.request_router->Request(request_state, std::move(request));
}); });
MG_ASSERT(result_rows.size() == 1); MG_ASSERT(result_rows.size() == 1);
auto &result_row = result_rows.front(); auto &result_row = result_rows.front();
@ -2525,14 +2525,14 @@ class DistributedExpandCursor : public Cursor {
case EdgeAtom::Direction::IN: { case EdgeAtom::Direction::IN: {
for (auto &edge : edge_messages) { for (auto &edge : edge_messages) {
edge_accessors.emplace_back(msgs::Edge{std::move(edge.other_end), vertex.Id(), {}, {edge.gid}, edge.type}, edge_accessors.emplace_back(msgs::Edge{std::move(edge.other_end), vertex.Id(), {}, {edge.gid}, edge.type},
context.shard_request_manager); context.request_router);
} }
break; break;
} }
case EdgeAtom::Direction::OUT: { case EdgeAtom::Direction::OUT: {
for (auto &edge : edge_messages) { for (auto &edge : edge_messages) {
edge_accessors.emplace_back(msgs::Edge{vertex.Id(), std::move(edge.other_end), {}, {edge.gid}, edge.type}, edge_accessors.emplace_back(msgs::Edge{vertex.Id(), std::move(edge.other_end), {}, {edge.gid}, edge.type},
context.shard_request_manager); context.request_router);
} }
break; break;
} }

View File

@ -14,12 +14,12 @@
#include "query/v2/bindings/pretty_print.hpp" #include "query/v2/bindings/pretty_print.hpp"
#include "query/v2/db_accessor.hpp" #include "query/v2/db_accessor.hpp"
#include "query/v2/shard_request_manager.hpp" #include "query/v2/request_router.hpp"
#include "utils/string.hpp" #include "utils/string.hpp"
namespace memgraph::query::v2::plan { namespace memgraph::query::v2::plan {
PlanPrinter::PlanPrinter(const ShardRequestManagerInterface *request_manager, std::ostream *out) PlanPrinter::PlanPrinter(const RequestRouterInterface *request_manager, std::ostream *out)
: request_manager_(request_manager), out_(out) {} : request_manager_(request_manager), out_(out) {}
#define PRE_VISIT(TOp) \ #define PRE_VISIT(TOp) \
@ -263,14 +263,13 @@ void PlanPrinter::Branch(query::v2::plan::LogicalOperator &op, const std::string
--depth_; --depth_;
} }
void PrettyPrint(const ShardRequestManagerInterface &request_manager, const LogicalOperator *plan_root, void PrettyPrint(const RequestRouterInterface &request_manager, const LogicalOperator *plan_root, std::ostream *out) {
std::ostream *out) {
PlanPrinter printer(&request_manager, out); PlanPrinter printer(&request_manager, out);
// FIXME(mtomic): We should make visitors that take const arguments. // FIXME(mtomic): We should make visitors that take const arguments.
const_cast<LogicalOperator *>(plan_root)->Accept(printer); const_cast<LogicalOperator *>(plan_root)->Accept(printer);
} }
nlohmann::json PlanToJson(const ShardRequestManagerInterface &request_manager, const LogicalOperator *plan_root) { nlohmann::json PlanToJson(const RequestRouterInterface &request_manager, const LogicalOperator *plan_root) {
impl::PlanToJsonVisitor visitor(&request_manager); impl::PlanToJsonVisitor visitor(&request_manager);
// FIXME(mtomic): We should make visitors that take const arguments. // FIXME(mtomic): We should make visitors that take const arguments.
const_cast<LogicalOperator *>(plan_root)->Accept(visitor); const_cast<LogicalOperator *>(plan_root)->Accept(visitor);
@ -349,15 +348,15 @@ json ToJson(const utils::Bound<Expression *> &bound) {
json ToJson(const Symbol &symbol) { return symbol.name(); } json ToJson(const Symbol &symbol) { return symbol.name(); }
json ToJson(storage::v3::EdgeTypeId edge_type, const ShardRequestManagerInterface &request_manager) { json ToJson(storage::v3::EdgeTypeId edge_type, const RequestRouterInterface &request_manager) {
return request_manager.EdgeTypeToName(edge_type); return request_manager.EdgeTypeToName(edge_type);
} }
json ToJson(storage::v3::LabelId label, const ShardRequestManagerInterface &request_manager) { json ToJson(storage::v3::LabelId label, const RequestRouterInterface &request_manager) {
return request_manager.LabelToName(label); return request_manager.LabelToName(label);
} }
json ToJson(storage::v3::PropertyId property, const ShardRequestManagerInterface &request_manager) { json ToJson(storage::v3::PropertyId property, const RequestRouterInterface &request_manager) {
return request_manager.PropertyToName(property); return request_manager.PropertyToName(property);
} }
@ -369,7 +368,7 @@ json ToJson(NamedExpression *nexpr) {
} }
json ToJson(const std::vector<std::pair<storage::v3::PropertyId, Expression *>> &properties, json ToJson(const std::vector<std::pair<storage::v3::PropertyId, Expression *>> &properties,
const ShardRequestManagerInterface &request_manager) { const RequestRouterInterface &request_manager) {
json json; json json;
for (const auto &prop_pair : properties) { for (const auto &prop_pair : properties) {
json.emplace(ToJson(prop_pair.first, request_manager), ToJson(prop_pair.second)); json.emplace(ToJson(prop_pair.first, request_manager), ToJson(prop_pair.second));
@ -377,7 +376,7 @@ json ToJson(const std::vector<std::pair<storage::v3::PropertyId, Expression *>>
return json; return json;
} }
json ToJson(const NodeCreationInfo &node_info, const ShardRequestManagerInterface &request_manager) { json ToJson(const NodeCreationInfo &node_info, const RequestRouterInterface &request_manager) {
json self; json self;
self["symbol"] = ToJson(node_info.symbol); self["symbol"] = ToJson(node_info.symbol);
self["labels"] = ToJson(node_info.labels, request_manager); self["labels"] = ToJson(node_info.labels, request_manager);
@ -386,7 +385,7 @@ json ToJson(const NodeCreationInfo &node_info, const ShardRequestManagerInterfac
return self; return self;
} }
json ToJson(const EdgeCreationInfo &edge_info, const ShardRequestManagerInterface &request_manager) { json ToJson(const EdgeCreationInfo &edge_info, const RequestRouterInterface &request_manager) {
json self; json self;
self["symbol"] = ToJson(edge_info.symbol); self["symbol"] = ToJson(edge_info.symbol);
const auto *props = std::get_if<PropertiesMapList>(&edge_info.properties); const auto *props = std::get_if<PropertiesMapList>(&edge_info.properties);

View File

@ -18,7 +18,7 @@
#include "query/v2/frontend/ast/ast.hpp" #include "query/v2/frontend/ast/ast.hpp"
#include "query/v2/plan/operator.hpp" #include "query/v2/plan/operator.hpp"
#include "query/v2/shard_request_manager.hpp" #include "query/v2/request_router.hpp"
namespace memgraph::query::v2 { namespace memgraph::query::v2 {
@ -27,20 +27,19 @@ namespace plan {
class LogicalOperator; class LogicalOperator;
/// Pretty print a `LogicalOperator` plan to a `std::ostream`. /// Pretty print a `LogicalOperator` plan to a `std::ostream`.
/// ShardRequestManager is needed for resolving label and property names. /// RequestRouter is needed for resolving label and property names.
/// Note that `plan_root` isn't modified, but we can't take it as a const /// Note that `plan_root` isn't modified, but we can't take it as a const
/// because we don't have support for visiting a const LogicalOperator. /// because we don't have support for visiting a const LogicalOperator.
void PrettyPrint(const ShardRequestManagerInterface &request_manager, const LogicalOperator *plan_root, void PrettyPrint(const RequestRouterInterface &request_manager, const LogicalOperator *plan_root, std::ostream *out);
std::ostream *out);
/// Overload of `PrettyPrint` which defaults the `std::ostream` to `std::cout`. /// Overload of `PrettyPrint` which defaults the `std::ostream` to `std::cout`.
inline void PrettyPrint(const ShardRequestManagerInterface &request_manager, const LogicalOperator *plan_root) { inline void PrettyPrint(const RequestRouterInterface &request_manager, const LogicalOperator *plan_root) {
PrettyPrint(request_manager, plan_root, &std::cout); PrettyPrint(request_manager, plan_root, &std::cout);
} }
/// Convert a `LogicalOperator` plan to a JSON representation. /// Convert a `LogicalOperator` plan to a JSON representation.
/// DbAccessor is needed for resolving label and property names. /// DbAccessor is needed for resolving label and property names.
nlohmann::json PlanToJson(const ShardRequestManagerInterface &request_manager, const LogicalOperator *plan_root); nlohmann::json PlanToJson(const RequestRouterInterface &request_manager, const LogicalOperator *plan_root);
class PlanPrinter : public virtual HierarchicalLogicalOperatorVisitor { class PlanPrinter : public virtual HierarchicalLogicalOperatorVisitor {
public: public:
@ -48,7 +47,7 @@ class PlanPrinter : public virtual HierarchicalLogicalOperatorVisitor {
using HierarchicalLogicalOperatorVisitor::PreVisit; using HierarchicalLogicalOperatorVisitor::PreVisit;
using HierarchicalLogicalOperatorVisitor::Visit; using HierarchicalLogicalOperatorVisitor::Visit;
PlanPrinter(const ShardRequestManagerInterface *request_manager, std::ostream *out); PlanPrinter(const RequestRouterInterface *request_manager, std::ostream *out);
bool DefaultPreVisit() override; bool DefaultPreVisit() override;
@ -115,7 +114,7 @@ class PlanPrinter : public virtual HierarchicalLogicalOperatorVisitor {
void Branch(LogicalOperator &op, const std::string &branch_name = ""); void Branch(LogicalOperator &op, const std::string &branch_name = "");
int64_t depth_{0}; int64_t depth_{0};
const ShardRequestManagerInterface *request_manager_{nullptr}; const RequestRouterInterface *request_manager_{nullptr};
std::ostream *out_{nullptr}; std::ostream *out_{nullptr};
}; };
@ -133,20 +132,20 @@ nlohmann::json ToJson(const utils::Bound<Expression *> &bound);
nlohmann::json ToJson(const Symbol &symbol); nlohmann::json ToJson(const Symbol &symbol);
nlohmann::json ToJson(storage::v3::EdgeTypeId edge_type, const ShardRequestManagerInterface &request_manager); nlohmann::json ToJson(storage::v3::EdgeTypeId edge_type, const RequestRouterInterface &request_manager);
nlohmann::json ToJson(storage::v3::LabelId label, const ShardRequestManagerInterface &request_manager); nlohmann::json ToJson(storage::v3::LabelId label, const RequestRouterInterface &request_manager);
nlohmann::json ToJson(storage::v3::PropertyId property, const ShardRequestManagerInterface &request_manager); nlohmann::json ToJson(storage::v3::PropertyId property, const RequestRouterInterface &request_manager);
nlohmann::json ToJson(NamedExpression *nexpr); nlohmann::json ToJson(NamedExpression *nexpr);
nlohmann::json ToJson(const std::vector<std::pair<storage::v3::PropertyId, Expression *>> &properties, nlohmann::json ToJson(const std::vector<std::pair<storage::v3::PropertyId, Expression *>> &properties,
const ShardRequestManagerInterface &request_manager); const RequestRouterInterface &request_manager);
nlohmann::json ToJson(const NodeCreationInfo &node_info, const ShardRequestManagerInterface &request_manager); nlohmann::json ToJson(const NodeCreationInfo &node_info, const RequestRouterInterface &request_manager);
nlohmann::json ToJson(const EdgeCreationInfo &edge_info, const ShardRequestManagerInterface &request_manager); nlohmann::json ToJson(const EdgeCreationInfo &edge_info, const RequestRouterInterface &request_manager);
nlohmann::json ToJson(const Aggregate::Element &elem); nlohmann::json ToJson(const Aggregate::Element &elem);
@ -161,7 +160,7 @@ nlohmann::json ToJson(const std::vector<T> &items, Args &&...args) {
class PlanToJsonVisitor : public virtual HierarchicalLogicalOperatorVisitor { class PlanToJsonVisitor : public virtual HierarchicalLogicalOperatorVisitor {
public: public:
explicit PlanToJsonVisitor(const ShardRequestManagerInterface *request_manager) : request_manager_(request_manager) {} explicit PlanToJsonVisitor(const RequestRouterInterface *request_manager) : request_manager_(request_manager) {}
using HierarchicalLogicalOperatorVisitor::PostVisit; using HierarchicalLogicalOperatorVisitor::PostVisit;
using HierarchicalLogicalOperatorVisitor::PreVisit; using HierarchicalLogicalOperatorVisitor::PreVisit;
@ -217,7 +216,7 @@ class PlanToJsonVisitor : public virtual HierarchicalLogicalOperatorVisitor {
protected: protected:
nlohmann::json output_; nlohmann::json output_;
const ShardRequestManagerInterface *request_manager_; const RequestRouterInterface *request_manager_;
nlohmann::json PopOutput() { nlohmann::json PopOutput() {
nlohmann::json tmp; nlohmann::json tmp;

View File

@ -272,7 +272,7 @@ class RuleBasedPlanner {
PropertiesMapList vector_props; PropertiesMapList vector_props;
vector_props.reserve(node_properties->size()); vector_props.reserve(node_properties->size());
for (const auto &kv : *node_properties) { for (const auto &kv : *node_properties) {
// TODO(kostasrim) GetProperty should be implemented in terms of ShardRequestManager NameToProperty // TODO(kostasrim) GetProperty should be implemented in terms of RequestRouter NameToProperty
vector_props.push_back({GetProperty(kv.first), kv.second}); vector_props.push_back({GetProperty(kv.first), kv.second});
} }
return std::move(vector_props); return std::move(vector_props);

View File

@ -15,7 +15,7 @@
#include <optional> #include <optional>
#include "query/v2/bindings/typed_value.hpp" #include "query/v2/bindings/typed_value.hpp"
#include "query/v2/shard_request_manager.hpp" #include "query/v2/request_router.hpp"
#include "storage/v3/conversions.hpp" #include "storage/v3/conversions.hpp"
#include "storage/v3/id_types.hpp" #include "storage/v3/id_types.hpp"
#include "storage/v3/property_value.hpp" #include "storage/v3/property_value.hpp"
@ -29,11 +29,11 @@ namespace memgraph::query::v2::plan {
template <class TDbAccessor> template <class TDbAccessor>
class VertexCountCache { class VertexCountCache {
public: public:
explicit VertexCountCache(TDbAccessor *shard_request_manager) : shard_request_manager_{shard_request_manager} {} explicit VertexCountCache(TDbAccessor *request_router) : request_router_{request_router} {}
auto NameToLabel(const std::string &name) { return shard_request_manager_->NameToLabel(name); } auto NameToLabel(const std::string &name) { return request_router_->NameToLabel(name); }
auto NameToProperty(const std::string &name) { return shard_request_manager_->NameToProperty(name); } auto NameToProperty(const std::string &name) { return request_router_->NameToProperty(name); }
auto NameToEdgeType(const std::string &name) { return shard_request_manager_->NameToEdgeType(name); } auto NameToEdgeType(const std::string &name) { return request_router_->NameToEdgeType(name); }
int64_t VerticesCount() { return 1; } int64_t VerticesCount() { return 1; }
@ -53,11 +53,11 @@ class VertexCountCache {
} }
// For now return true if label is primary label // For now return true if label is primary label
bool LabelIndexExists(storage::v3::LabelId label) { return shard_request_manager_->IsPrimaryLabel(label); } bool LabelIndexExists(storage::v3::LabelId label) { return request_router_->IsPrimaryLabel(label); }
bool LabelPropertyIndexExists(storage::v3::LabelId /*label*/, storage::v3::PropertyId /*property*/) { return false; } bool LabelPropertyIndexExists(storage::v3::LabelId /*label*/, storage::v3::PropertyId /*property*/) { return false; }
ShardRequestManagerInterface *shard_request_manager_; RequestRouterInterface *request_router_;
}; };
template <class TDbAccessor> template <class TDbAccessor>

View File

@ -83,10 +83,10 @@ struct ExecutionState {
// CompoundKey is optional because some operators require to iterate over all the available keys // CompoundKey is optional because some operators require to iterate over all the available keys
// of a shard. One example is ScanAll, where we only require the field label. // of a shard. One example is ScanAll, where we only require the field label.
std::optional<CompoundKey> key; std::optional<CompoundKey> key;
// Transaction id to be filled by the ShardRequestManager implementation // Transaction id to be filled by the RequestRouter implementation
coordinator::Hlc transaction_id; coordinator::Hlc transaction_id;
// Initialized by ShardRequestManager implementation. This vector is filled with the shards that // Initialized by RequestRouter implementation. This vector is filled with the shards that
// the ShardRequestManager impl will send requests to. When a request to a shard exhausts it, meaning that // the RequestRouter impl will send requests to. When a request to a shard exhausts it, meaning that
// it pulled all the requested data from the given Shard, it will be removed from the Vector. When the Vector becomes // it pulled all the requested data from the given Shard, it will be removed from the Vector. When the Vector becomes
// empty, it means that all of the requests have completed succefully. // empty, it means that all of the requests have completed succefully.
// TODO(gvolfing) // TODO(gvolfing)
@ -101,16 +101,16 @@ struct ExecutionState {
State state = INITIALIZING; State state = INITIALIZING;
}; };
class ShardRequestManagerInterface { class RequestRouterInterface {
public: public:
using VertexAccessor = query::v2::accessors::VertexAccessor; using VertexAccessor = query::v2::accessors::VertexAccessor;
ShardRequestManagerInterface() = default; RequestRouterInterface() = default;
ShardRequestManagerInterface(const ShardRequestManagerInterface &) = delete; RequestRouterInterface(const RequestRouterInterface &) = delete;
ShardRequestManagerInterface(ShardRequestManagerInterface &&) = delete; RequestRouterInterface(RequestRouterInterface &&) = delete;
ShardRequestManagerInterface &operator=(const ShardRequestManagerInterface &) = delete; RequestRouterInterface &operator=(const RequestRouterInterface &) = delete;
ShardRequestManagerInterface &&operator=(ShardRequestManagerInterface &&) = delete; RequestRouterInterface &&operator=(RequestRouterInterface &&) = delete;
virtual ~ShardRequestManagerInterface() = default; virtual ~RequestRouterInterface() = default;
virtual void StartTransaction() = 0; virtual void StartTransaction() = 0;
virtual void Commit() = 0; virtual void Commit() = 0;
@ -134,7 +134,7 @@ class ShardRequestManagerInterface {
// TODO(kostasrim)rename this class template // TODO(kostasrim)rename this class template
template <typename TTransport> template <typename TTransport>
class ShardRequestManager : public ShardRequestManagerInterface { class RequestRouter : public RequestRouterInterface {
public: public:
using StorageClient = coordinator::RsmClient<TTransport, msgs::WriteRequests, msgs::WriteResponses, using StorageClient = coordinator::RsmClient<TTransport, msgs::WriteRequests, msgs::WriteResponses,
msgs::ReadRequests, msgs::ReadResponses>; msgs::ReadRequests, msgs::ReadResponses>;
@ -145,15 +145,14 @@ class ShardRequestManager : public ShardRequestManagerInterface {
using ShardMap = coordinator::ShardMap; using ShardMap = coordinator::ShardMap;
using CompoundKey = coordinator::PrimaryKey; using CompoundKey = coordinator::PrimaryKey;
using VertexAccessor = query::v2::accessors::VertexAccessor; using VertexAccessor = query::v2::accessors::VertexAccessor;
ShardRequestManager(CoordinatorClient coord, io::Io<TTransport> &&io) RequestRouter(CoordinatorClient coord, io::Io<TTransport> &&io) : coord_cli_(std::move(coord)), io_(std::move(io)) {}
: coord_cli_(std::move(coord)), io_(std::move(io)) {}
ShardRequestManager(const ShardRequestManager &) = delete; RequestRouter(const RequestRouter &) = delete;
ShardRequestManager(ShardRequestManager &&) = delete; RequestRouter(RequestRouter &&) = delete;
ShardRequestManager &operator=(const ShardRequestManager &) = delete; RequestRouter &operator=(const RequestRouter &) = delete;
ShardRequestManager &operator=(ShardRequestManager &&) = delete; RequestRouter &operator=(RequestRouter &&) = delete;
~ShardRequestManager() override {} ~RequestRouter() override {}
void StartTransaction() override { void StartTransaction() override {
coordinator::HlcRequest req{.last_shard_map_version = shards_map_.GetHlc()}; coordinator::HlcRequest req{.last_shard_map_version = shards_map_.GetHlc()};

View File

@ -31,8 +31,8 @@
#include "io/simulator/simulator_transport.hpp" #include "io/simulator/simulator_transport.hpp"
#include "query/v2/accessors.hpp" #include "query/v2/accessors.hpp"
#include "query/v2/conversions.hpp" #include "query/v2/conversions.hpp"
#include "query/v2/request_router.hpp"
#include "query/v2/requests.hpp" #include "query/v2/requests.hpp"
#include "query/v2/shard_request_manager.hpp"
#include "storage/v3/property_value.hpp" #include "storage/v3/property_value.hpp"
#include "utils/result.hpp" #include "utils/result.hpp"
@ -151,7 +151,7 @@ void RunStorageRaft(Raft<IoImpl, MockedShardRsm, WriteRequests, WriteResponses,
server.Run(); server.Run();
} }
void TestScanVertices(query::v2::ShardRequestManagerInterface &io) { void TestScanVertices(query::v2::RequestRouterInterface &io) {
msgs::ExecutionState<ScanVerticesRequest> state{.label = "test_label"}; msgs::ExecutionState<ScanVerticesRequest> state{.label = "test_label"};
auto result = io.Request(state); auto result = io.Request(state);
@ -171,7 +171,7 @@ void TestScanVertices(query::v2::ShardRequestManagerInterface &io) {
} }
} }
void TestCreateVertices(query::v2::ShardRequestManagerInterface &io) { void TestCreateVertices(query::v2::RequestRouterInterface &io) {
using PropVal = msgs::Value; using PropVal = msgs::Value;
msgs::ExecutionState<CreateVerticesRequest> state; msgs::ExecutionState<CreateVerticesRequest> state;
std::vector<msgs::NewVertex> new_vertices; std::vector<msgs::NewVertex> new_vertices;
@ -187,7 +187,7 @@ void TestCreateVertices(query::v2::ShardRequestManagerInterface &io) {
MG_ASSERT(result.size() == 2); MG_ASSERT(result.size() == 2);
} }
void TestCreateExpand(query::v2::ShardRequestManagerInterface &io) { void TestCreateExpand(query::v2::RequestRouterInterface &io) {
using PropVal = msgs::Value; using PropVal = msgs::Value;
msgs::ExecutionState<msgs::CreateExpandRequest> state; msgs::ExecutionState<msgs::CreateExpandRequest> state;
std::vector<msgs::NewExpand> new_expands; std::vector<msgs::NewExpand> new_expands;
@ -209,20 +209,20 @@ void TestCreateExpand(query::v2::ShardRequestManagerInterface &io) {
MG_ASSERT(responses[1].success); MG_ASSERT(responses[1].success);
} }
void TestExpandOne(query::v2::ShardRequestManagerInterface &shard_request_manager) { void TestExpandOne(query::v2::RequestRouterInterface &request_router) {
msgs::ExecutionState<msgs::ExpandOneRequest> state{}; msgs::ExecutionState<msgs::ExpandOneRequest> state{};
msgs::ExpandOneRequest request; msgs::ExpandOneRequest request;
const auto edge_type_id = shard_request_manager.NameToEdgeType("edge_type"); const auto edge_type_id = request_router.NameToEdgeType("edge_type");
const auto label = msgs::Label{shard_request_manager.NameToLabel("test_label")}; const auto label = msgs::Label{request_router.NameToLabel("test_label")};
request.src_vertices.push_back(msgs::VertexId{label, {msgs::Value(int64_t(0)), msgs::Value(int64_t(0))}}); request.src_vertices.push_back(msgs::VertexId{label, {msgs::Value(int64_t(0)), msgs::Value(int64_t(0))}});
request.edge_types.push_back(msgs::EdgeType{edge_type_id}); request.edge_types.push_back(msgs::EdgeType{edge_type_id});
request.direction = msgs::EdgeDirection::BOTH; request.direction = msgs::EdgeDirection::BOTH;
auto result_rows = shard_request_manager.Request(state, std::move(request)); auto result_rows = request_router.Request(state, std::move(request));
MG_ASSERT(result_rows.size() == 2); MG_ASSERT(result_rows.size() == 2);
} }
template <typename ShardRequestManager> template <typename RequestRouter>
void TestAggregate(ShardRequestManager &io) {} void TestAggregate(RequestRouter &io) {}
void DoTest() { void DoTest() {
SimulatorConfig config{ SimulatorConfig config{
@ -337,7 +337,7 @@ void DoTest() {
// also get the current shard map // also get the current shard map
CoordinatorClient<SimulatorTransport> coordinator_client(cli_io, c_addrs[0], c_addrs); CoordinatorClient<SimulatorTransport> coordinator_client(cli_io, c_addrs[0], c_addrs);
query::v2::ShardRequestManager<SimulatorTransport> io(std::move(coordinator_client), std::move(cli_io)); query::v2::RequestRouter<SimulatorTransport> io(std::move(coordinator_client), std::move(cli_io));
io.StartTransaction(); io.StartTransaction();
TestScanVertices(io); TestScanVertices(io);

View File

@ -30,8 +30,8 @@
#include "io/simulator/simulator_transport.hpp" #include "io/simulator/simulator_transport.hpp"
#include "machine_manager/machine_config.hpp" #include "machine_manager/machine_config.hpp"
#include "machine_manager/machine_manager.hpp" #include "machine_manager/machine_manager.hpp"
#include "query/v2/request_router.hpp"
#include "query/v2/requests.hpp" #include "query/v2/requests.hpp"
#include "query/v2/shard_request_manager.hpp"
#include "testing_constants.hpp" #include "testing_constants.hpp"
#include "utils/print_helpers.hpp" #include "utils/print_helpers.hpp"
#include "utils/variant_helpers.hpp" #include "utils/variant_helpers.hpp"
@ -151,8 +151,8 @@ ShardMap TestShardMap(int n_splits, int replication_factor) {
return sm; return sm;
} }
void ExecuteOp(query::v2::ShardRequestManager<SimulatorTransport> &shard_request_manager, void ExecuteOp(query::v2::RequestRouter<SimulatorTransport> &request_router, std::set<CompoundKey> &correctness_model,
std::set<CompoundKey> &correctness_model, CreateVertex create_vertex) { CreateVertex create_vertex) {
const auto key1 = memgraph::storage::v3::PropertyValue(create_vertex.first); const auto key1 = memgraph::storage::v3::PropertyValue(create_vertex.first);
const auto key2 = memgraph::storage::v3::PropertyValue(create_vertex.second); const auto key2 = memgraph::storage::v3::PropertyValue(create_vertex.second);
@ -166,7 +166,7 @@ void ExecuteOp(query::v2::ShardRequestManager<SimulatorTransport> &shard_request
query::v2::ExecutionState<msgs::CreateVerticesRequest> state; query::v2::ExecutionState<msgs::CreateVerticesRequest> state;
auto label_id = shard_request_manager.NameToLabel("test_label"); auto label_id = request_router.NameToLabel("test_label");
msgs::NewVertex nv{.primary_key = primary_key}; msgs::NewVertex nv{.primary_key = primary_key};
nv.label_ids.push_back({label_id}); nv.label_ids.push_back({label_id});
@ -174,7 +174,7 @@ void ExecuteOp(query::v2::ShardRequestManager<SimulatorTransport> &shard_request
std::vector<msgs::NewVertex> new_vertices; std::vector<msgs::NewVertex> new_vertices;
new_vertices.push_back(std::move(nv)); new_vertices.push_back(std::move(nv));
auto result = shard_request_manager.Request(state, std::move(new_vertices)); auto result = request_router.Request(state, std::move(new_vertices));
RC_ASSERT(result.size() == 1); RC_ASSERT(result.size() == 1);
RC_ASSERT(!result[0].error.has_value()); RC_ASSERT(!result[0].error.has_value());
@ -182,11 +182,11 @@ void ExecuteOp(query::v2::ShardRequestManager<SimulatorTransport> &shard_request
correctness_model.emplace(std::make_pair(create_vertex.first, create_vertex.second)); correctness_model.emplace(std::make_pair(create_vertex.first, create_vertex.second));
} }
void ExecuteOp(query::v2::ShardRequestManager<SimulatorTransport> &shard_request_manager, void ExecuteOp(query::v2::RequestRouter<SimulatorTransport> &request_router, std::set<CompoundKey> &correctness_model,
std::set<CompoundKey> &correctness_model, ScanAll scan_all) { ScanAll scan_all) {
query::v2::ExecutionState<msgs::ScanVerticesRequest> request{.label = "test_label"}; query::v2::ExecutionState<msgs::ScanVerticesRequest> request{.label = "test_label"};
auto results = shard_request_manager.Request(request); auto results = request_router.Request(request);
RC_ASSERT(results.size() == correctness_model.size()); RC_ASSERT(results.size() == correctness_model.size());
@ -247,15 +247,14 @@ std::pair<SimulatorStats, LatencyHistogramSummaries> RunClusterSimulation(const
CoordinatorClient<SimulatorTransport> coordinator_client(cli_io, coordinator_address, {coordinator_address}); CoordinatorClient<SimulatorTransport> coordinator_client(cli_io, coordinator_address, {coordinator_address});
WaitForShardsToInitialize(coordinator_client); WaitForShardsToInitialize(coordinator_client);
query::v2::ShardRequestManager<SimulatorTransport> shard_request_manager(std::move(coordinator_client), query::v2::RequestRouter<SimulatorTransport> request_router(std::move(coordinator_client), std::move(cli_io));
std::move(cli_io));
shard_request_manager.StartTransaction(); request_router.StartTransaction();
auto correctness_model = std::set<CompoundKey>{}; auto correctness_model = std::set<CompoundKey>{};
for (const Op &op : ops) { for (const Op &op : ops) {
std::visit([&](auto &o) { ExecuteOp(shard_request_manager, correctness_model, o); }, op.inner); std::visit([&](auto &o) { ExecuteOp(request_router, correctness_model, o); }, op.inner);
} }
// We have now completed our workload without failing any assertions, so we can // We have now completed our workload without failing any assertions, so we can

View File

@ -29,8 +29,8 @@
#include "io/simulator/simulator_transport.hpp" #include "io/simulator/simulator_transport.hpp"
#include "machine_manager/machine_config.hpp" #include "machine_manager/machine_config.hpp"
#include "machine_manager/machine_manager.hpp" #include "machine_manager/machine_manager.hpp"
#include "query/v2/request_router.hpp"
#include "query/v2/requests.hpp" #include "query/v2/requests.hpp"
#include "query/v2/shard_request_manager.hpp"
#include "utils/variant_helpers.hpp" #include "utils/variant_helpers.hpp"
namespace memgraph::tests::simulation { namespace memgraph::tests::simulation {
@ -161,8 +161,8 @@ ShardMap TestShardMap(int shards, int replication_factor, int gap_between_shards
return sm; return sm;
} }
void ExecuteOp(query::v2::ShardRequestManager<LocalTransport> &shard_request_manager, void ExecuteOp(query::v2::RequestRouter<LocalTransport> &request_router, std::set<CompoundKey> &correctness_model,
std::set<CompoundKey> &correctness_model, CreateVertex create_vertex) { CreateVertex create_vertex) {
const auto key1 = memgraph::storage::v3::PropertyValue(create_vertex.first); const auto key1 = memgraph::storage::v3::PropertyValue(create_vertex.first);
const auto key2 = memgraph::storage::v3::PropertyValue(create_vertex.second); const auto key2 = memgraph::storage::v3::PropertyValue(create_vertex.second);
@ -176,7 +176,7 @@ void ExecuteOp(query::v2::ShardRequestManager<LocalTransport> &shard_request_man
query::v2::ExecutionState<msgs::CreateVerticesRequest> state; query::v2::ExecutionState<msgs::CreateVerticesRequest> state;
auto label_id = shard_request_manager.NameToLabel("test_label"); auto label_id = request_router.NameToLabel("test_label");
msgs::NewVertex nv{.primary_key = primary_key}; msgs::NewVertex nv{.primary_key = primary_key};
nv.label_ids.push_back({label_id}); nv.label_ids.push_back({label_id});
@ -184,7 +184,7 @@ void ExecuteOp(query::v2::ShardRequestManager<LocalTransport> &shard_request_man
std::vector<msgs::NewVertex> new_vertices; std::vector<msgs::NewVertex> new_vertices;
new_vertices.push_back(std::move(nv)); new_vertices.push_back(std::move(nv));
auto result = shard_request_manager.Request(state, std::move(new_vertices)); auto result = request_router.Request(state, std::move(new_vertices));
MG_ASSERT(result.size() == 1); MG_ASSERT(result.size() == 1);
MG_ASSERT(!result[0].error.has_value()); MG_ASSERT(!result[0].error.has_value());
@ -192,11 +192,11 @@ void ExecuteOp(query::v2::ShardRequestManager<LocalTransport> &shard_request_man
correctness_model.emplace(std::make_pair(create_vertex.first, create_vertex.second)); correctness_model.emplace(std::make_pair(create_vertex.first, create_vertex.second));
} }
void ExecuteOp(query::v2::ShardRequestManager<LocalTransport> &shard_request_manager, void ExecuteOp(query::v2::RequestRouter<LocalTransport> &request_router, std::set<CompoundKey> &correctness_model,
std::set<CompoundKey> &correctness_model, ScanAll scan_all) { ScanAll scan_all) {
query::v2::ExecutionState<msgs::ScanVerticesRequest> request{.label = "test_label"}; query::v2::ExecutionState<msgs::ScanVerticesRequest> request{.label = "test_label"};
auto results = shard_request_manager.Request(request); auto results = request_router.Request(request);
MG_ASSERT(results.size() == correctness_model.size()); MG_ASSERT(results.size() == correctness_model.size());
@ -245,23 +245,22 @@ void RunWorkload(int shards, int replication_factor, int create_ops, int scan_op
WaitForShardsToInitialize(coordinator_client); WaitForShardsToInitialize(coordinator_client);
auto time_after_shard_stabilization = cli_io_2.Now(); auto time_after_shard_stabilization = cli_io_2.Now();
query::v2::ShardRequestManager<LocalTransport> shard_request_manager(std::move(coordinator_client), query::v2::RequestRouter<LocalTransport> request_router(std::move(coordinator_client), std::move(cli_io));
std::move(cli_io));
shard_request_manager.StartTransaction(); request_router.StartTransaction();
auto correctness_model = std::set<CompoundKey>{}; auto correctness_model = std::set<CompoundKey>{};
auto time_before_creates = cli_io_2.Now(); auto time_before_creates = cli_io_2.Now();
for (int i = 0; i < create_ops; i++) { for (int i = 0; i < create_ops; i++) {
ExecuteOp(shard_request_manager, correctness_model, CreateVertex{.first = i, .second = i}); ExecuteOp(request_router, correctness_model, CreateVertex{.first = i, .second = i});
} }
auto time_after_creates = cli_io_2.Now(); auto time_after_creates = cli_io_2.Now();
for (int i = 0; i < scan_ops; i++) { for (int i = 0; i < scan_ops; i++) {
ExecuteOp(shard_request_manager, correctness_model, ScanAll{}); ExecuteOp(request_router, correctness_model, ScanAll{});
} }
auto time_after_scan = cli_io_2.Now(); auto time_after_scan = cli_io_2.Now();

View File

@ -27,7 +27,7 @@
#include <machine_manager/machine_manager.hpp> #include <machine_manager/machine_manager.hpp>
#include <query/v2/requests.hpp> #include <query/v2/requests.hpp>
#include "io/rsm/rsm_client.hpp" #include "io/rsm/rsm_client.hpp"
#include "query/v2/shard_request_manager.hpp" #include "query/v2/request_router.hpp"
#include "storage/v3/id_types.hpp" #include "storage/v3/id_types.hpp"
#include "storage/v3/schemas.hpp" #include "storage/v3/schemas.hpp"
@ -109,19 +109,19 @@ ShardMap TestShardMap() {
return sm; return sm;
} }
template <typename ShardRequestManager> template <typename RequestRouter>
void TestScanAll(ShardRequestManager &shard_request_manager) { void TestScanAll(RequestRouter &request_router) {
query::v2::ExecutionState<msgs::ScanVerticesRequest> state{.label = kLabelName}; query::v2::ExecutionState<msgs::ScanVerticesRequest> state{.label = kLabelName};
auto result = shard_request_manager.Request(state); auto result = request_router.Request(state);
EXPECT_EQ(result.size(), 2); EXPECT_EQ(result.size(), 2);
} }
void TestCreateVertices(query::v2::ShardRequestManagerInterface &shard_request_manager) { void TestCreateVertices(query::v2::RequestRouterInterface &request_router) {
using PropVal = msgs::Value; using PropVal = msgs::Value;
query::v2::ExecutionState<msgs::CreateVerticesRequest> state; query::v2::ExecutionState<msgs::CreateVerticesRequest> state;
std::vector<msgs::NewVertex> new_vertices; std::vector<msgs::NewVertex> new_vertices;
auto label_id = shard_request_manager.NameToLabel(kLabelName); auto label_id = request_router.NameToLabel(kLabelName);
msgs::NewVertex a1{.primary_key = {PropVal(int64_t(0)), PropVal(int64_t(0))}}; msgs::NewVertex a1{.primary_key = {PropVal(int64_t(0)), PropVal(int64_t(0))}};
a1.label_ids.push_back({label_id}); a1.label_ids.push_back({label_id});
msgs::NewVertex a2{.primary_key = {PropVal(int64_t(13)), PropVal(int64_t(13))}}; msgs::NewVertex a2{.primary_key = {PropVal(int64_t(13)), PropVal(int64_t(13))}};
@ -129,18 +129,18 @@ void TestCreateVertices(query::v2::ShardRequestManagerInterface &shard_request_m
new_vertices.push_back(std::move(a1)); new_vertices.push_back(std::move(a1));
new_vertices.push_back(std::move(a2)); new_vertices.push_back(std::move(a2));
auto result = shard_request_manager.Request(state, std::move(new_vertices)); auto result = request_router.Request(state, std::move(new_vertices));
EXPECT_EQ(result.size(), 1); EXPECT_EQ(result.size(), 1);
EXPECT_FALSE(result[0].error.has_value()) << result[0].error->message; EXPECT_FALSE(result[0].error.has_value()) << result[0].error->message;
} }
void TestCreateExpand(query::v2::ShardRequestManagerInterface &shard_request_manager) { void TestCreateExpand(query::v2::RequestRouterInterface &request_router) {
using PropVal = msgs::Value; using PropVal = msgs::Value;
query::v2::ExecutionState<msgs::CreateExpandRequest> state; query::v2::ExecutionState<msgs::CreateExpandRequest> state;
std::vector<msgs::NewExpand> new_expands; std::vector<msgs::NewExpand> new_expands;
const auto edge_type_id = shard_request_manager.NameToEdgeType("edge_type"); const auto edge_type_id = request_router.NameToEdgeType("edge_type");
const auto label = msgs::Label{shard_request_manager.NameToLabel("test_label")}; const auto label = msgs::Label{request_router.NameToLabel("test_label")};
const msgs::VertexId vertex_id_1{label, {PropVal(int64_t(0)), PropVal(int64_t(0))}}; const msgs::VertexId vertex_id_1{label, {PropVal(int64_t(0)), PropVal(int64_t(0))}};
const msgs::VertexId vertex_id_2{label, {PropVal(int64_t(13)), PropVal(int64_t(13))}}; const msgs::VertexId vertex_id_2{label, {PropVal(int64_t(13)), PropVal(int64_t(13))}};
msgs::NewExpand expand_1{ msgs::NewExpand expand_1{
@ -150,27 +150,27 @@ void TestCreateExpand(query::v2::ShardRequestManagerInterface &shard_request_man
new_expands.push_back(std::move(expand_1)); new_expands.push_back(std::move(expand_1));
new_expands.push_back(std::move(expand_2)); new_expands.push_back(std::move(expand_2));
auto responses = shard_request_manager.Request(state, std::move(new_expands)); auto responses = request_router.Request(state, std::move(new_expands));
MG_ASSERT(responses.size() == 1); MG_ASSERT(responses.size() == 1);
MG_ASSERT(!responses[0].error.has_value()); MG_ASSERT(!responses[0].error.has_value());
} }
void TestExpandOne(query::v2::ShardRequestManagerInterface &shard_request_manager) { void TestExpandOne(query::v2::RequestRouterInterface &request_router) {
query::v2::ExecutionState<msgs::ExpandOneRequest> state{}; query::v2::ExecutionState<msgs::ExpandOneRequest> state{};
msgs::ExpandOneRequest request; msgs::ExpandOneRequest request;
const auto edge_type_id = shard_request_manager.NameToEdgeType("edge_type"); const auto edge_type_id = request_router.NameToEdgeType("edge_type");
const auto label = msgs::Label{shard_request_manager.NameToLabel("test_label")}; const auto label = msgs::Label{request_router.NameToLabel("test_label")};
request.src_vertices.push_back(msgs::VertexId{label, {msgs::Value(int64_t(0)), msgs::Value(int64_t(0))}}); request.src_vertices.push_back(msgs::VertexId{label, {msgs::Value(int64_t(0)), msgs::Value(int64_t(0))}});
request.edge_types.push_back(msgs::EdgeType{edge_type_id}); request.edge_types.push_back(msgs::EdgeType{edge_type_id});
request.direction = msgs::EdgeDirection::BOTH; request.direction = msgs::EdgeDirection::BOTH;
auto result_rows = shard_request_manager.Request(state, std::move(request)); auto result_rows = request_router.Request(state, std::move(request));
MG_ASSERT(result_rows.size() == 1); MG_ASSERT(result_rows.size() == 1);
MG_ASSERT(result_rows[0].in_edges_with_all_properties.size() == 1); MG_ASSERT(result_rows[0].in_edges_with_all_properties.size() == 1);
MG_ASSERT(result_rows[0].out_edges_with_all_properties.size() == 1); MG_ASSERT(result_rows[0].out_edges_with_all_properties.size() == 1);
} }
template <typename ShardRequestManager> template <typename RequestRouter>
void TestAggregate(ShardRequestManager &shard_request_manager) {} void TestAggregate(RequestRouter &request_router) {}
MachineManager<LocalTransport> MkMm(LocalSystem &local_system, std::vector<Address> coordinator_addresses, Address addr, MachineManager<LocalTransport> MkMm(LocalSystem &local_system, std::vector<Address> coordinator_addresses, Address addr,
ShardMap shard_map) { ShardMap shard_map) {
@ -226,14 +226,13 @@ TEST(MachineManager, BasicFunctionality) {
CoordinatorClient<LocalTransport> coordinator_client(cli_io, coordinator_address, {coordinator_address}); CoordinatorClient<LocalTransport> coordinator_client(cli_io, coordinator_address, {coordinator_address});
query::v2::ShardRequestManager<LocalTransport> shard_request_manager(std::move(coordinator_client), query::v2::RequestRouter<LocalTransport> request_router(std::move(coordinator_client), std::move(cli_io));
std::move(cli_io));
shard_request_manager.StartTransaction(); request_router.StartTransaction();
TestCreateVertices(shard_request_manager); TestCreateVertices(request_router);
TestScanAll(shard_request_manager); TestScanAll(request_router);
TestCreateExpand(shard_request_manager); TestCreateExpand(request_router);
TestExpandOne(shard_request_manager); TestExpandOne(request_router);
local_system.ShutDown(); local_system.ShutDown();
}; };