diff --git a/src/glue/v2/communication.cpp b/src/glue/v2/communication.cpp index eedf699bb..3406105b0 100644 --- a/src/glue/v2/communication.cpp +++ b/src/glue/v2/communication.cpp @@ -72,7 +72,7 @@ query::v2::TypedValue ToTypedValue(const Value &value) { } communication::bolt::Vertex ToBoltVertex(const query::v2::accessors::VertexAccessor &vertex, - const msgs::ShardRequestManagerInterface *shard_request_manager, + const query::v2::ShardRequestManagerInterface *shard_request_manager, storage::v3::View /*view*/) { auto id = communication::bolt::Id::FromUint(0); @@ -92,7 +92,7 @@ communication::bolt::Vertex ToBoltVertex(const query::v2::accessors::VertexAcces } communication::bolt::Edge ToBoltEdge(const query::v2::accessors::EdgeAccessor &edge, - const msgs::ShardRequestManagerInterface *shard_request_manager, + const query::v2::ShardRequestManagerInterface *shard_request_manager, storage::v3::View /*view*/) { // TODO(jbajic) Fix bolt communication auto id = communication::bolt::Id::FromUint(0); @@ -109,15 +109,15 @@ communication::bolt::Edge ToBoltEdge(const query::v2::accessors::EdgeAccessor &e } communication::bolt::Path ToBoltPath(const query::v2::accessors::Path & /*edge*/, - const msgs::ShardRequestManagerInterface * /*shard_request_manager*/, + const query::v2::ShardRequestManagerInterface * /*shard_request_manager*/, storage::v3::View /*view*/) { // TODO(jbajic) Fix bolt communication MG_ASSERT(false, "Path is unimplemented!"); return {}; } -Value ToBoltValue(const query::v2::TypedValue &value, const msgs::ShardRequestManagerInterface *shard_request_manager, - storage::v3::View view) { +Value ToBoltValue(const query::v2::TypedValue &value, + const query::v2::ShardRequestManagerInterface *shard_request_manager, storage::v3::View view) { switch (value.type()) { case query::v2::TypedValue::Type::Null: return {}; diff --git a/src/glue/v2/communication.hpp b/src/glue/v2/communication.hpp index a20162176..817ff02bd 100644 --- a/src/glue/v2/communication.hpp +++ b/src/glue/v2/communication.hpp @@ -32,39 +32,39 @@ namespace memgraph::glue::v2 { /// @param storage::v3::VertexAccessor for converting to /// communication::bolt::Vertex. -/// @param msgs::ShardRequestManagerInterface *shard_request_manager getting label and property names. +/// @param query::v2::ShardRequestManagerInterface *shard_request_manager getting label and property names. /// @param storage::v3::View for deciding which vertex attributes are visible. /// /// @throw std::bad_alloc communication::bolt::Vertex ToBoltVertex(const storage::v3::VertexAccessor &vertex, - const msgs::ShardRequestManagerInterface *shard_request_manager, + const query::v2::ShardRequestManagerInterface *shard_request_manager, storage::v3::View view); /// @param storage::v3::EdgeAccessor for converting to communication::bolt::Edge. -/// @param msgs::ShardRequestManagerInterface *shard_request_manager getting edge type and property names. +/// @param query::v2::ShardRequestManagerInterface *shard_request_manager getting edge type and property names. /// @param storage::v3::View for deciding which edge attributes are visible. /// /// @throw std::bad_alloc communication::bolt::Edge ToBoltEdge(const storage::v3::EdgeAccessor &edge, - const msgs::ShardRequestManagerInterface *shard_request_manager, + const query::v2::ShardRequestManagerInterface *shard_request_manager, storage::v3::View view); /// @param query::v2::Path for converting to communication::bolt::Path. -/// @param msgs::ShardRequestManagerInterface *shard_request_manager ToBoltVertex and ToBoltEdge. +/// @param query::v2::ShardRequestManagerInterface *shard_request_manager ToBoltVertex and ToBoltEdge. /// @param storage::v3::View for ToBoltVertex and ToBoltEdge. /// /// @throw std::bad_alloc communication::bolt::Path ToBoltPath(const query::v2::accessors::Path &path, - const msgs::ShardRequestManagerInterface *shard_request_manager, + const query::v2::ShardRequestManagerInterface *shard_request_manager, storage::v3::View view); /// @param query::v2::TypedValue for converting to communication::bolt::Value. -/// @param msgs::ShardRequestManagerInterface *shard_request_manager ToBoltVertex and ToBoltEdge. +/// @param query::v2::ShardRequestManagerInterface *shard_request_manager ToBoltVertex and ToBoltEdge. /// @param storage::v3::View for ToBoltVertex and ToBoltEdge. /// /// @throw std::bad_alloc communication::bolt::Value ToBoltValue(const query::v2::TypedValue &value, - const msgs::ShardRequestManagerInterface *shard_request_manager, + const query::v2::ShardRequestManagerInterface *shard_request_manager, storage::v3::View view); query::v2::TypedValue ToTypedValue(const communication::bolt::Value &value); @@ -76,7 +76,7 @@ storage::v3::PropertyValue ToPropertyValue(const communication::bolt::Value &val communication::bolt::Value ToBoltValue(msgs::Value value); communication::bolt::Value ToBoltValue(msgs::Value value, - const msgs::ShardRequestManagerInterface *shard_request_manager, + const query::v2::ShardRequestManagerInterface *shard_request_manager, storage::v3::View view); } // namespace memgraph::glue::v2 diff --git a/src/memgraph.cpp b/src/memgraph.cpp index 5903e65ad..565f9940c 100644 --- a/src/memgraph.cpp +++ b/src/memgraph.cpp @@ -497,7 +497,8 @@ class BoltSession final : public memgraph::communication::bolt::Session &values) { @@ -512,7 +513,7 @@ class BoltSession final : public memgraph::communication::bolt::Session> props, - const msgs::ShardRequestManagerInterface *manager) + const ShardRequestManagerInterface *manager) : vertex(std::move(v)), properties(std::move(props)), manager_(manager) {} VertexAccessor::VertexAccessor(Vertex v, std::map &&props, - const msgs::ShardRequestManagerInterface *manager) + const ShardRequestManagerInterface *manager) : vertex(std::move(v)), manager_(manager) { properties.reserve(props.size()); for (auto &[id, value] : props) { @@ -57,7 +57,7 @@ VertexAccessor::VertexAccessor(Vertex v, std::map &&props, } VertexAccessor::VertexAccessor(Vertex v, const std::map &props, - const msgs::ShardRequestManagerInterface *manager) + const ShardRequestManagerInterface *manager) : vertex(std::move(v)), manager_(manager) { properties.reserve(props.size()); for (const auto &[id, value] : props) { diff --git a/src/query/v2/accessors.hpp b/src/query/v2/accessors.hpp index 8e10c865d..ca7ec999d 100644 --- a/src/query/v2/accessors.hpp +++ b/src/query/v2/accessors.hpp @@ -24,24 +24,24 @@ #include "utils/memory.hpp" #include "utils/memory_tracker.hpp" -namespace memgraph::msgs { +namespace memgraph::query::v2 { class ShardRequestManagerInterface; -} // namespace memgraph::msgs +} // namespace memgraph::query::v2 namespace memgraph::query::v2::accessors { -using Value = memgraph::msgs::Value; -using Edge = memgraph::msgs::Edge; -using Vertex = memgraph::msgs::Vertex; -using Label = memgraph::msgs::Label; -using PropertyId = memgraph::msgs::PropertyId; -using EdgeTypeId = memgraph::msgs::EdgeTypeId; +using Value = msgs::Value; +using Edge = msgs::Edge; +using Vertex = msgs::Vertex; +using Label = msgs::Label; +using PropertyId = msgs::PropertyId; +using EdgeTypeId = msgs::EdgeTypeId; class VertexAccessor; class EdgeAccessor final { public: - explicit EdgeAccessor(Edge edge, const msgs::ShardRequestManagerInterface *manager); + explicit EdgeAccessor(Edge edge, const ShardRequestManagerInterface *manager); [[nodiscard]] EdgeTypeId EdgeType() const; @@ -69,7 +69,7 @@ class EdgeAccessor final { private: Edge edge; - const msgs::ShardRequestManagerInterface *manager_; + const ShardRequestManagerInterface *manager_; }; class VertexAccessor final { @@ -78,10 +78,10 @@ class VertexAccessor final { using Label = msgs::Label; using VertexId = msgs::VertexId; VertexAccessor(Vertex v, std::vector> props, - const msgs::ShardRequestManagerInterface *manager); + const ShardRequestManagerInterface *manager); - VertexAccessor(Vertex v, std::map &&props, const msgs::ShardRequestManagerInterface *manager); - VertexAccessor(Vertex v, const std::map &props, const msgs::ShardRequestManagerInterface *manager); + VertexAccessor(Vertex v, std::map &&props, const ShardRequestManagerInterface *manager); + VertexAccessor(Vertex v, const std::map &props, const ShardRequestManagerInterface *manager); [[nodiscard]] Label PrimaryLabel() const; @@ -150,7 +150,7 @@ class VertexAccessor final { private: Vertex vertex; std::vector> properties; - const msgs::ShardRequestManagerInterface *manager_; + const ShardRequestManagerInterface *manager_; }; // inline VertexAccessor EdgeAccessor::To() const { return VertexAccessor(impl_.ToVertex()); } diff --git a/src/query/v2/bindings/eval.hpp b/src/query/v2/bindings/eval.hpp index 8f1b19384..584e88922 100644 --- a/src/query/v2/bindings/eval.hpp +++ b/src/query/v2/bindings/eval.hpp @@ -24,27 +24,26 @@ #include "storage/v3/result.hpp" #include "storage/v3/view.hpp" -namespace memgraph::msgs { -class ShardRequestManagerInterface; -} // namespace memgraph::msgs - namespace memgraph::query::v2 { +class ShardRequestManagerInterface; + inline const auto lam = [](const auto &val) { return ValueToTypedValue(val); }; namespace detail { class Callable { public: - auto operator()(const memgraph::storage::v3::PropertyValue &val) const { - return memgraph::storage::v3::PropertyToTypedValue(val); + auto operator()(const storage::v3::PropertyValue &val) const { + return storage::v3::PropertyToTypedValue(val); }; - auto operator()(const msgs::Value &val, memgraph::msgs::ShardRequestManagerInterface *manager) const { + auto operator()(const msgs::Value &val, ShardRequestManagerInterface *manager) const { return ValueToTypedValue(val, manager); }; }; } // namespace detail -using ExpressionEvaluator = memgraph::expr::ExpressionEvaluator< - TypedValue, memgraph::query::v2::EvaluationContext, memgraph::msgs::ShardRequestManagerInterface, storage::v3::View, - storage::v3::LabelId, msgs::Value, detail::Callable, common::ErrorCode, memgraph::expr::QueryEngineTag>; +using ExpressionEvaluator = + expr::ExpressionEvaluator; } // namespace memgraph::query::v2 diff --git a/src/query/v2/context.hpp b/src/query/v2/context.hpp index 338546f4d..388342349 100644 --- a/src/query/v2/context.hpp +++ b/src/query/v2/context.hpp @@ -60,8 +60,8 @@ struct EvaluationContext { mutable std::unordered_map counters; }; -inline std::vector NamesToProperties( - const std::vector &property_names, msgs::ShardRequestManagerInterface *shard_request_manager) { +inline std::vector NamesToProperties(const std::vector &property_names, + ShardRequestManagerInterface *shard_request_manager) { std::vector properties; // TODO Fix by using reference properties.reserve(property_names.size()); @@ -74,7 +74,7 @@ inline std::vector NamesToProperties( } inline std::vector NamesToLabels(const std::vector &label_names, - msgs::ShardRequestManagerInterface *shard_request_manager) { + ShardRequestManagerInterface *shard_request_manager) { std::vector labels; labels.reserve(label_names.size()); // TODO Fix by using reference @@ -97,7 +97,7 @@ struct ExecutionContext { plan::ProfilingStats *stats_root{nullptr}; ExecutionStats execution_stats; utils::AsyncTimer timer; - msgs::ShardRequestManagerInterface *shard_request_manager{nullptr}; + ShardRequestManagerInterface *shard_request_manager{nullptr}; IdAllocator *edge_ids_alloc; }; diff --git a/src/query/v2/conversions.hpp b/src/query/v2/conversions.hpp index 10299c919..c18ba8cc0 100644 --- a/src/query/v2/conversions.hpp +++ b/src/query/v2/conversions.hpp @@ -17,7 +17,7 @@ namespace memgraph::query::v2 { -inline TypedValue ValueToTypedValue(const msgs::Value &value, msgs::ShardRequestManagerInterface *manager) { +inline TypedValue ValueToTypedValue(const msgs::Value &value, ShardRequestManagerInterface *manager) { using Value = msgs::Value; switch (value.type) { case Value::Type::Null: diff --git a/src/query/v2/cypher_query_interpreter.cpp b/src/query/v2/cypher_query_interpreter.cpp index d365a53fb..908ee36cb 100644 --- a/src/query/v2/cypher_query_interpreter.cpp +++ b/src/query/v2/cypher_query_interpreter.cpp @@ -118,7 +118,7 @@ ParsedQuery ParseQuery(const std::string &query_string, const std::map MakeLogicalPlan(AstStorage ast_storage, CypherQuery *query, const Parameters ¶meters, - msgs::ShardRequestManagerInterface *shard_manager, + ShardRequestManagerInterface *shard_manager, const std::vector &predefined_identifiers) { auto vertex_counts = plan::MakeVertexCountCache(shard_manager); auto symbol_table = expr::MakeSymbolTable(query, predefined_identifiers); @@ -130,7 +130,7 @@ std::unique_ptr MakeLogicalPlan(AstStorage ast_storage, CypherQuery std::shared_ptr CypherQueryToPlan(uint64_t hash, AstStorage ast_storage, CypherQuery *query, const Parameters ¶meters, utils::SkipList *plan_cache, - msgs::ShardRequestManagerInterface *shard_manager, + ShardRequestManagerInterface *shard_manager, const std::vector &predefined_identifiers) { std::optional::Accessor> plan_cache_access; if (plan_cache) { diff --git a/src/query/v2/cypher_query_interpreter.hpp b/src/query/v2/cypher_query_interpreter.hpp index 74dbe85d5..b7f63ab8f 100644 --- a/src/query/v2/cypher_query_interpreter.hpp +++ b/src/query/v2/cypher_query_interpreter.hpp @@ -132,7 +132,7 @@ class SingleNodeLogicalPlan final : public LogicalPlan { }; std::unique_ptr MakeLogicalPlan(AstStorage ast_storage, CypherQuery *query, const Parameters ¶meters, - msgs::ShardRequestManagerInterface *shard_manager, + ShardRequestManagerInterface *shard_manager, const std::vector &predefined_identifiers); /** @@ -145,7 +145,7 @@ std::unique_ptr MakeLogicalPlan(AstStorage ast_storage, CypherQuery */ std::shared_ptr CypherQueryToPlan(uint64_t hash, AstStorage ast_storage, CypherQuery *query, const Parameters ¶meters, utils::SkipList *plan_cache, - msgs::ShardRequestManagerInterface *shard_manager, + ShardRequestManagerInterface *shard_manager, const std::vector &predefined_identifiers = {}); } // namespace memgraph::query::v2 diff --git a/src/query/v2/interpret/awesome_memgraph_functions.hpp b/src/query/v2/interpret/awesome_memgraph_functions.hpp index 134f05d7d..1fd351cd8 100644 --- a/src/query/v2/interpret/awesome_memgraph_functions.hpp +++ b/src/query/v2/interpret/awesome_memgraph_functions.hpp @@ -20,12 +20,10 @@ #include "storage/v3/view.hpp" #include "utils/memory.hpp" -namespace memgraph::msgs { -class ShardRequestManagerInterface; -} // namespace memgraph::msgs - namespace memgraph::query::v2 { +class ShardRequestManagerInterface; + namespace { const char kStartsWith[] = "STARTSWITH"; const char kEndsWith[] = "ENDSWITH"; @@ -36,7 +34,7 @@ const char kId[] = "ID"; struct FunctionContext { // TODO(kostasrim) consider optional here. ShardRequestManager does not exist on the storage. // DbAccessor *db_accessor; - msgs::ShardRequestManagerInterface *manager; + ShardRequestManagerInterface *manager; utils::MemoryResource *memory; int64_t timestamp; std::unordered_map *counters; diff --git a/src/query/v2/interpreter.cpp b/src/query/v2/interpreter.cpp index 045d94709..f9dc37184 100644 --- a/src/query/v2/interpreter.cpp +++ b/src/query/v2/interpreter.cpp @@ -143,7 +143,7 @@ class ReplQueryHandler final : public query::v2::ReplicationQueryHandler { /// @throw QueryRuntimeException if an error ocurred. Callback HandleAuthQuery(AuthQuery *auth_query, AuthQueryHandler *auth, const Parameters ¶meters, - msgs::ShardRequestManagerInterface *manager) { + ShardRequestManagerInterface *manager) { // Empty frame for evaluation of password expression. This is OK since // password should be either null or string literal and it's evaluation // should not depend on frame. @@ -312,7 +312,7 @@ Callback HandleAuthQuery(AuthQuery *auth_query, AuthQueryHandler *auth, const Pa } Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters ¶meters, - InterpreterContext *interpreter_context, msgs::ShardRequestManagerInterface *manager, + InterpreterContext *interpreter_context, ShardRequestManagerInterface *manager, std::vector *notifications) { expr::Frame frame(0); SymbolTable symbol_table; @@ -448,7 +448,7 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters & } Callback HandleSettingQuery(SettingQuery *setting_query, const Parameters ¶meters, - msgs::ShardRequestManagerInterface *manager) { + ShardRequestManagerInterface *manager) { expr::Frame frame(0); SymbolTable symbol_table; EvaluationContext evaluation_context; @@ -649,7 +649,7 @@ struct PullPlanVector { struct PullPlan { explicit PullPlan(std::shared_ptr plan, const Parameters ¶meters, bool is_profile_query, DbAccessor *dba, InterpreterContext *interpreter_context, utils::MemoryResource *execution_memory, - msgs::ShardRequestManagerInterface *shard_request_manager = nullptr, + ShardRequestManagerInterface *shard_request_manager = nullptr, // TriggerContextCollector *trigger_context_collector = nullptr, std::optional memory_limit = {}); std::optional Pull(AnyStream *stream, std::optional n, @@ -679,7 +679,7 @@ struct PullPlan { PullPlan::PullPlan(const std::shared_ptr plan, const Parameters ¶meters, const bool is_profile_query, DbAccessor *dba, InterpreterContext *interpreter_context, utils::MemoryResource *execution_memory, - msgs::ShardRequestManagerInterface *shard_request_manager, const std::optional memory_limit) + ShardRequestManagerInterface *shard_request_manager, const std::optional memory_limit) : plan_(plan), cursor_(plan->plan().MakeCursor(execution_memory)), frame_(plan->symbol_table().max_position(), execution_memory), @@ -804,7 +804,7 @@ Interpreter::Interpreter(InterpreterContext *interpreter_context) : interpreter_ auto random_uuid = boost::uuids::uuid{boost::uuids::random_generator()()}; auto query_io = interpreter_context_->io.ForkLocal(random_uuid); - shard_request_manager_ = std::make_unique>( + shard_request_manager_ = std::make_unique>( coordinator::CoordinatorClient( query_io, interpreter_context_->coordinator_address, std::vector{interpreter_context_->coordinator_address}), std::move(query_io)); @@ -881,7 +881,7 @@ PreparedQuery Interpreter::PrepareTransactionQuery(std::string_view query_upper) PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::map *summary, InterpreterContext *interpreter_context, DbAccessor *dba, utils::MemoryResource *execution_memory, std::vector *notifications, - msgs::ShardRequestManagerInterface *shard_request_manager) { + ShardRequestManagerInterface *shard_request_manager) { // TriggerContextCollector *trigger_context_collector = nullptr) { auto *cypher_query = utils::Downcast(parsed_query.query); @@ -942,7 +942,7 @@ PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::map *summary, InterpreterContext *interpreter_context, - msgs::ShardRequestManagerInterface *shard_request_manager, + ShardRequestManagerInterface *shard_request_manager, utils::MemoryResource *execution_memory) { const std::string kExplainQueryStart = "explain "; MG_ASSERT(utils::StartsWith(utils::ToLowerCase(parsed_query.stripped_query.query()), kExplainQueryStart), @@ -991,7 +991,7 @@ PreparedQuery PrepareExplainQuery(ParsedQuery parsed_query, std::map *summary, InterpreterContext *interpreter_context, DbAccessor *dba, utils::MemoryResource *execution_memory, - msgs::ShardRequestManagerInterface *shard_request_manager = nullptr) { + ShardRequestManagerInterface *shard_request_manager = nullptr) { const std::string kProfileQueryStart = "profile "; MG_ASSERT(utils::StartsWith(utils::ToLowerCase(parsed_query.stripped_query.query()), kProfileQueryStart), @@ -1185,7 +1185,7 @@ PreparedQuery PrepareIndexQuery(ParsedQuery parsed_query, bool in_explicit_trans PreparedQuery PrepareAuthQuery(ParsedQuery parsed_query, bool in_explicit_transaction, std::map *summary, InterpreterContext *interpreter_context, DbAccessor *dba, utils::MemoryResource *execution_memory, - msgs::ShardRequestManagerInterface *manager) { + ShardRequestManagerInterface *manager) { if (in_explicit_transaction) { throw UserModificationInMulticommandTxException(); } @@ -1221,7 +1221,7 @@ PreparedQuery PrepareAuthQuery(ParsedQuery parsed_query, bool in_explicit_transa PreparedQuery PrepareReplicationQuery(ParsedQuery parsed_query, const bool in_explicit_transaction, std::vector *notifications, InterpreterContext *interpreter_context, - msgs::ShardRequestManagerInterface *manager) { + ShardRequestManagerInterface *manager) { if (in_explicit_transaction) { throw ReplicationModificationInMulticommandTxException(); } @@ -1317,7 +1317,7 @@ PreparedQuery PrepareCreateSnapshotQuery(ParsedQuery parsed_query, bool in_expli } PreparedQuery PrepareSettingQuery(ParsedQuery parsed_query, const bool in_explicit_transaction, - msgs::ShardRequestManagerInterface *manager) { + ShardRequestManagerInterface *manager) { if (in_explicit_transaction) { throw SettingConfigInMulticommandTxException{}; } diff --git a/src/query/v2/interpreter.hpp b/src/query/v2/interpreter.hpp index dc413ef44..afc298a0c 100644 --- a/src/query/v2/interpreter.hpp +++ b/src/query/v2/interpreter.hpp @@ -296,7 +296,7 @@ class Interpreter final { */ void Abort(); - const msgs::ShardRequestManagerInterface *GetShardRequestManager() const { return shard_request_manager_.get(); } + const ShardRequestManagerInterface *GetShardRequestManager() const { return shard_request_manager_.get(); } private: struct QueryExecution { @@ -342,7 +342,7 @@ class Interpreter final { // move this unique_ptr into a shared_ptr. std::unique_ptr db_accessor_; std::optional execution_db_accessor_; - std::unique_ptr shard_request_manager_; + std::unique_ptr shard_request_manager_; bool in_explicit_transaction_{false}; bool expect_rollback_{false}; diff --git a/src/query/v2/plan/operator.cpp b/src/query/v2/plan/operator.cpp index d82d83034..30db1012e 100644 --- a/src/query/v2/plan/operator.cpp +++ b/src/query/v2/plan/operator.cpp @@ -252,7 +252,7 @@ class DistributedCreateNodeCursor : public Cursor { std::vector nodes_info_; std::vector>> src_vertex_props_; std::vector primary_keys_; - msgs::ExecutionState state_; + ExecutionState state_; }; bool Once::OnceCursor::Pull(Frame &, ExecutionContext &context) { @@ -365,7 +365,7 @@ class ScanAllCursor : public Cursor { std::optional vertices_it_; const char *op_name_; std::vector current_batch; - msgs::ExecutionState request_state; + ExecutionState request_state; }; class DistributedScanAllAndFilterCursor : public Cursor { @@ -386,7 +386,7 @@ class DistributedScanAllAndFilterCursor : public Cursor { using VertexAccessor = accessors::VertexAccessor; - bool MakeRequest(msgs::ShardRequestManagerInterface &shard_manager, ExecutionContext &context) { + bool MakeRequest(ShardRequestManagerInterface &shard_manager, ExecutionContext &context) { { SCOPED_REQUEST_WAIT_PROFILE; current_batch = shard_manager.Request(request_state_); @@ -403,7 +403,7 @@ class DistributedScanAllAndFilterCursor : public Cursor { if (MustAbort(context)) { throw HintedAbortError(); } - using State = msgs::ExecutionState; + using State = ExecutionState; if (request_state_.state == State::INITIALIZING) { if (!input_cursor_->Pull(frame, context)) { @@ -430,7 +430,7 @@ class DistributedScanAllAndFilterCursor : public Cursor { void ResetExecutionState() { current_batch.clear(); current_vertex_it = current_batch.end(); - request_state_ = msgs::ExecutionState{}; + request_state_ = ExecutionState{}; } void Reset() override { @@ -444,7 +444,7 @@ class DistributedScanAllAndFilterCursor : public Cursor { const char *op_name_; std::vector current_batch; std::vector::iterator current_vertex_it; - msgs::ExecutionState request_state_; + ExecutionState request_state_; std::optional label_; std::optional> property_expression_pair_; std::optional> filter_expressions_; @@ -2426,7 +2426,7 @@ class DistributedCreateExpandCursor : public Cursor { const UniqueCursorPtr input_cursor_; const CreateExpand &self_; - msgs::ExecutionState state_; + ExecutionState state_; }; class DistributedExpandCursor : public Cursor { @@ -2473,7 +2473,7 @@ class DistributedExpandCursor : public Cursor { request.edge_properties.emplace(); request.src_vertices.push_back(get_dst_vertex(edge, direction)); request.direction = (direction == EdgeAtom::Direction::IN) ? msgs::EdgeDirection::OUT : msgs::EdgeDirection::IN; - msgs::ExecutionState request_state; + ExecutionState request_state; auto result_rows = context.shard_request_manager->Request(request_state, std::move(request)); MG_ASSERT(result_rows.size() == 1); auto &result_row = result_rows.front(); @@ -2499,7 +2499,7 @@ class DistributedExpandCursor : public Cursor { // to not fetch any properties of the edges request.edge_properties.emplace(); request.src_vertices.push_back(vertex.Id()); - msgs::ExecutionState request_state; + ExecutionState request_state; auto result_rows = std::invoke([&context, &request_state, &request]() mutable { SCOPED_REQUEST_WAIT_PROFILE; return context.shard_request_manager->Request(request_state, std::move(request)); diff --git a/src/query/v2/plan/pretty_print.cpp b/src/query/v2/plan/pretty_print.cpp index b756a6299..b7ab6da6e 100644 --- a/src/query/v2/plan/pretty_print.cpp +++ b/src/query/v2/plan/pretty_print.cpp @@ -19,7 +19,7 @@ namespace memgraph::query::v2::plan { -PlanPrinter::PlanPrinter(const msgs::ShardRequestManagerInterface *request_manager, std::ostream *out) +PlanPrinter::PlanPrinter(const ShardRequestManagerInterface *request_manager, std::ostream *out) : request_manager_(request_manager), out_(out) {} #define PRE_VISIT(TOp) \ @@ -263,14 +263,14 @@ void PlanPrinter::Branch(query::v2::plan::LogicalOperator &op, const std::string --depth_; } -void PrettyPrint(const msgs::ShardRequestManagerInterface &request_manager, const LogicalOperator *plan_root, +void PrettyPrint(const ShardRequestManagerInterface &request_manager, const LogicalOperator *plan_root, std::ostream *out) { PlanPrinter printer(&request_manager, out); // FIXME(mtomic): We should make visitors that take const arguments. const_cast(plan_root)->Accept(printer); } -nlohmann::json PlanToJson(const msgs::ShardRequestManagerInterface &request_manager, const LogicalOperator *plan_root) { +nlohmann::json PlanToJson(const ShardRequestManagerInterface &request_manager, const LogicalOperator *plan_root) { impl::PlanToJsonVisitor visitor(&request_manager); // FIXME(mtomic): We should make visitors that take const arguments. const_cast(plan_root)->Accept(visitor); @@ -349,15 +349,15 @@ json ToJson(const utils::Bound &bound) { json ToJson(const Symbol &symbol) { return symbol.name(); } -json ToJson(storage::v3::EdgeTypeId edge_type, const msgs::ShardRequestManagerInterface &request_manager) { +json ToJson(storage::v3::EdgeTypeId edge_type, const ShardRequestManagerInterface &request_manager) { return request_manager.EdgeTypeToName(edge_type); } -json ToJson(storage::v3::LabelId label, const msgs::ShardRequestManagerInterface &request_manager) { +json ToJson(storage::v3::LabelId label, const ShardRequestManagerInterface &request_manager) { return request_manager.LabelToName(label); } -json ToJson(storage::v3::PropertyId property, const msgs::ShardRequestManagerInterface &request_manager) { +json ToJson(storage::v3::PropertyId property, const ShardRequestManagerInterface &request_manager) { return request_manager.PropertyToName(property); } @@ -369,7 +369,7 @@ json ToJson(NamedExpression *nexpr) { } json ToJson(const std::vector> &properties, - const msgs::ShardRequestManagerInterface &request_manager) { + const ShardRequestManagerInterface &request_manager) { json json; for (const auto &prop_pair : properties) { json.emplace(ToJson(prop_pair.first, request_manager), ToJson(prop_pair.second)); @@ -377,7 +377,7 @@ json ToJson(const std::vector> return json; } -json ToJson(const NodeCreationInfo &node_info, const msgs::ShardRequestManagerInterface &request_manager) { +json ToJson(const NodeCreationInfo &node_info, const ShardRequestManagerInterface &request_manager) { json self; self["symbol"] = ToJson(node_info.symbol); self["labels"] = ToJson(node_info.labels, request_manager); @@ -386,7 +386,7 @@ json ToJson(const NodeCreationInfo &node_info, const msgs::ShardRequestManagerIn return self; } -json ToJson(const EdgeCreationInfo &edge_info, const msgs::ShardRequestManagerInterface &request_manager) { +json ToJson(const EdgeCreationInfo &edge_info, const ShardRequestManagerInterface &request_manager) { json self; self["symbol"] = ToJson(edge_info.symbol); const auto *props = std::get_if(&edge_info.properties); diff --git a/src/query/v2/plan/pretty_print.hpp b/src/query/v2/plan/pretty_print.hpp index c7442b196..8485723a3 100644 --- a/src/query/v2/plan/pretty_print.hpp +++ b/src/query/v2/plan/pretty_print.hpp @@ -30,17 +30,17 @@ class LogicalOperator; /// ShardRequestManager is needed for resolving label and property names. /// Note that `plan_root` isn't modified, but we can't take it as a const /// because we don't have support for visiting a const LogicalOperator. -void PrettyPrint(const msgs::ShardRequestManagerInterface &request_manager, const LogicalOperator *plan_root, +void PrettyPrint(const ShardRequestManagerInterface &request_manager, const LogicalOperator *plan_root, std::ostream *out); /// Overload of `PrettyPrint` which defaults the `std::ostream` to `std::cout`. -inline void PrettyPrint(const msgs::ShardRequestManagerInterface &request_manager, const LogicalOperator *plan_root) { +inline void PrettyPrint(const ShardRequestManagerInterface &request_manager, const LogicalOperator *plan_root) { PrettyPrint(request_manager, plan_root, &std::cout); } /// Convert a `LogicalOperator` plan to a JSON representation. /// DbAccessor is needed for resolving label and property names. -nlohmann::json PlanToJson(const msgs::ShardRequestManagerInterface &request_manager, const LogicalOperator *plan_root); +nlohmann::json PlanToJson(const ShardRequestManagerInterface &request_manager, const LogicalOperator *plan_root); class PlanPrinter : public virtual HierarchicalLogicalOperatorVisitor { public: @@ -48,7 +48,7 @@ class PlanPrinter : public virtual HierarchicalLogicalOperatorVisitor { using HierarchicalLogicalOperatorVisitor::PreVisit; using HierarchicalLogicalOperatorVisitor::Visit; - PlanPrinter(const msgs::ShardRequestManagerInterface *request_manager, std::ostream *out); + PlanPrinter(const ShardRequestManagerInterface *request_manager, std::ostream *out); bool DefaultPreVisit() override; @@ -115,7 +115,7 @@ class PlanPrinter : public virtual HierarchicalLogicalOperatorVisitor { void Branch(LogicalOperator &op, const std::string &branch_name = ""); int64_t depth_{0}; - const msgs::ShardRequestManagerInterface *request_manager_{nullptr}; + const ShardRequestManagerInterface *request_manager_{nullptr}; std::ostream *out_{nullptr}; }; @@ -133,20 +133,20 @@ nlohmann::json ToJson(const utils::Bound &bound); nlohmann::json ToJson(const Symbol &symbol); -nlohmann::json ToJson(storage::v3::EdgeTypeId edge_type, const msgs::ShardRequestManagerInterface &request_manager); +nlohmann::json ToJson(storage::v3::EdgeTypeId edge_type, const ShardRequestManagerInterface &request_manager); -nlohmann::json ToJson(storage::v3::LabelId label, const msgs::ShardRequestManagerInterface &request_manager); +nlohmann::json ToJson(storage::v3::LabelId label, const ShardRequestManagerInterface &request_manager); -nlohmann::json ToJson(storage::v3::PropertyId property, const msgs::ShardRequestManagerInterface &request_manager); +nlohmann::json ToJson(storage::v3::PropertyId property, const ShardRequestManagerInterface &request_manager); nlohmann::json ToJson(NamedExpression *nexpr); nlohmann::json ToJson(const std::vector> &properties, - const msgs::ShardRequestManagerInterface &request_manager); + const ShardRequestManagerInterface &request_manager); -nlohmann::json ToJson(const NodeCreationInfo &node_info, const msgs::ShardRequestManagerInterface &request_manager); +nlohmann::json ToJson(const NodeCreationInfo &node_info, const ShardRequestManagerInterface &request_manager); -nlohmann::json ToJson(const EdgeCreationInfo &edge_info, const msgs::ShardRequestManagerInterface &request_manager); +nlohmann::json ToJson(const EdgeCreationInfo &edge_info, const ShardRequestManagerInterface &request_manager); nlohmann::json ToJson(const Aggregate::Element &elem); @@ -161,8 +161,7 @@ nlohmann::json ToJson(const std::vector &items, Args &&...args) { class PlanToJsonVisitor : public virtual HierarchicalLogicalOperatorVisitor { public: - explicit PlanToJsonVisitor(const msgs::ShardRequestManagerInterface *request_manager) - : request_manager_(request_manager) {} + explicit PlanToJsonVisitor(const ShardRequestManagerInterface *request_manager) : request_manager_(request_manager) {} using HierarchicalLogicalOperatorVisitor::PostVisit; using HierarchicalLogicalOperatorVisitor::PreVisit; @@ -218,7 +217,7 @@ class PlanToJsonVisitor : public virtual HierarchicalLogicalOperatorVisitor { protected: nlohmann::json output_; - const msgs::ShardRequestManagerInterface *request_manager_; + const ShardRequestManagerInterface *request_manager_; nlohmann::json PopOutput() { nlohmann::json tmp; diff --git a/src/query/v2/plan/vertex_count_cache.hpp b/src/query/v2/plan/vertex_count_cache.hpp index a7bfbdf85..47a10ba3e 100644 --- a/src/query/v2/plan/vertex_count_cache.hpp +++ b/src/query/v2/plan/vertex_count_cache.hpp @@ -57,7 +57,7 @@ class VertexCountCache { bool LabelPropertyIndexExists(storage::v3::LabelId /*label*/, storage::v3::PropertyId /*property*/) { return false; } - msgs::ShardRequestManagerInterface *shard_request_manager_; + ShardRequestManagerInterface *shard_request_manager_; }; template diff --git a/src/query/v2/shard_request_manager.hpp b/src/query/v2/shard_request_manager.hpp index 20bae7b97..68a10f384 100644 --- a/src/query/v2/shard_request_manager.hpp +++ b/src/query/v2/shard_request_manager.hpp @@ -42,13 +42,12 @@ #include "storage/v3/value_conversions.hpp" #include "utils/result.hpp" -namespace memgraph::msgs { +namespace memgraph::query::v2 { template class RsmStorageClientManager { public: - using CompoundKey = memgraph::io::rsm::ShardRsmKey; - using Shard = memgraph::coordinator::Shard; - using LabelId = memgraph::storage::v3::LabelId; + using CompoundKey = io::rsm::ShardRsmKey; + using Shard = coordinator::Shard; RsmStorageClientManager() = default; RsmStorageClientManager(const RsmStorageClientManager &) = delete; RsmStorageClientManager(RsmStorageClientManager &&) = delete; @@ -74,8 +73,8 @@ class RsmStorageClientManager { template struct ExecutionState { - using CompoundKey = memgraph::io::rsm::ShardRsmKey; - using Shard = memgraph::coordinator::Shard; + using CompoundKey = io::rsm::ShardRsmKey; + using Shard = coordinator::Shard; enum State : int8_t { INITIALIZING, EXECUTING, COMPLETED }; // label is optional because some operators can create/remove etc, vertices. These kind of requests contain the label @@ -85,7 +84,7 @@ struct ExecutionState { // of a shard. One example is ScanAll, where we only require the field label. std::optional key; // Transaction id to be filled by the ShardRequestManager implementation - memgraph::coordinator::Hlc transaction_id; + coordinator::Hlc transaction_id; // Initialized by ShardRequestManager implementation. This vector is filled with the shards that // the ShardRequestManager impl will send requests to. When a request to a shard exhausts it, meaning that // it pulled all the requested data from the given Shard, it will be removed from the Vector. When the Vector becomes @@ -104,7 +103,7 @@ struct ExecutionState { class ShardRequestManagerInterface { public: - using VertexAccessor = memgraph::query::v2::accessors::VertexAccessor; + using VertexAccessor = query::v2::accessors::VertexAccessor; ShardRequestManagerInterface() = default; ShardRequestManagerInterface(const ShardRequestManagerInterface &) = delete; ShardRequestManagerInterface(ShardRequestManagerInterface &&) = delete; @@ -115,38 +114,38 @@ class ShardRequestManagerInterface { virtual void StartTransaction() = 0; virtual void Commit() = 0; - virtual std::vector Request(ExecutionState &state) = 0; - virtual std::vector Request(ExecutionState &state, - std::vector new_vertices) = 0; - virtual std::vector Request(ExecutionState &state, - ExpandOneRequest request) = 0; - virtual std::vector Request(ExecutionState &state, - std::vector new_edges) = 0; + virtual std::vector Request(ExecutionState &state) = 0; + virtual std::vector Request(ExecutionState &state, + std::vector new_vertices) = 0; + virtual std::vector Request(ExecutionState &state, + msgs::ExpandOneRequest request) = 0; + virtual std::vector Request(ExecutionState &state, + std::vector new_edges) = 0; virtual storage::v3::EdgeTypeId NameToEdgeType(const std::string &name) const = 0; virtual storage::v3::PropertyId NameToProperty(const std::string &name) const = 0; virtual storage::v3::LabelId NameToLabel(const std::string &name) const = 0; - virtual const std::string &PropertyToName(memgraph::storage::v3::PropertyId prop) const = 0; - virtual const std::string &LabelToName(memgraph::storage::v3::LabelId label) const = 0; - virtual const std::string &EdgeTypeToName(memgraph::storage::v3::EdgeTypeId type) const = 0; - virtual bool IsPrimaryLabel(LabelId label) const = 0; - virtual bool IsPrimaryKey(LabelId primary_label, PropertyId property) const = 0; + virtual const std::string &PropertyToName(storage::v3::PropertyId prop) const = 0; + virtual const std::string &LabelToName(storage::v3::LabelId label) const = 0; + virtual const std::string &EdgeTypeToName(storage::v3::EdgeTypeId type) const = 0; + virtual bool IsPrimaryLabel(storage::v3::LabelId label) const = 0; + virtual bool IsPrimaryKey(storage::v3::LabelId primary_label, storage::v3::PropertyId property) const = 0; }; // TODO(kostasrim)rename this class template template class ShardRequestManager : public ShardRequestManagerInterface { public: - using StorageClient = - memgraph::coordinator::RsmClient; - using CoordinatorWriteRequests = memgraph::coordinator::CoordinatorWriteRequests; - using CoordinatorClient = memgraph::coordinator::CoordinatorClient; - using Address = memgraph::io::Address; - using Shard = memgraph::coordinator::Shard; - using ShardMap = memgraph::coordinator::ShardMap; - using CompoundKey = memgraph::coordinator::PrimaryKey; - using VertexAccessor = memgraph::query::v2::accessors::VertexAccessor; - ShardRequestManager(CoordinatorClient coord, memgraph::io::Io &&io) + using StorageClient = coordinator::RsmClient; + using CoordinatorWriteRequests = coordinator::CoordinatorWriteRequests; + using CoordinatorClient = coordinator::CoordinatorClient; + using Address = io::Address; + using Shard = coordinator::Shard; + using ShardMap = coordinator::ShardMap; + using CompoundKey = coordinator::PrimaryKey; + using VertexAccessor = query::v2::accessors::VertexAccessor; + ShardRequestManager(CoordinatorClient coord, io::Io &&io) : coord_cli_(std::move(coord)), io_(std::move(io)) {} ShardRequestManager(const ShardRequestManager &) = delete; @@ -157,14 +156,14 @@ class ShardRequestManager : public ShardRequestManagerInterface { ~ShardRequestManager() override {} void StartTransaction() override { - memgraph::coordinator::HlcRequest req{.last_shard_map_version = shards_map_.GetHlc()}; + coordinator::HlcRequest req{.last_shard_map_version = shards_map_.GetHlc()}; CoordinatorWriteRequests write_req = req; auto write_res = coord_cli_.SendWriteRequest(write_req); if (write_res.HasError()) { throw std::runtime_error("HLC request failed"); } auto coordinator_write_response = write_res.GetValue(); - auto hlc_response = std::get(coordinator_write_response); + auto hlc_response = std::get(coordinator_write_response); // Transaction ID to be used later... transaction_id_ = hlc_response.new_hlc; @@ -176,14 +175,14 @@ class ShardRequestManager : public ShardRequestManagerInterface { } void Commit() override { - memgraph::coordinator::HlcRequest req{.last_shard_map_version = shards_map_.GetHlc()}; + coordinator::HlcRequest req{.last_shard_map_version = shards_map_.GetHlc()}; CoordinatorWriteRequests write_req = req; auto write_res = coord_cli_.SendWriteRequest(write_req); if (write_res.HasError()) { throw std::runtime_error("HLC request for commit failed"); } auto coordinator_write_response = write_res.GetValue(); - auto hlc_response = std::get(coordinator_write_response); + auto hlc_response = std::get(coordinator_write_response); if (hlc_response.fresher_shard_map) { shards_map_ = hlc_response.fresher_shard_map.value(); @@ -204,8 +203,8 @@ class ShardRequestManager : public ShardRequestManagerInterface { if (commit_response.HasError()) { throw std::runtime_error("Commit request timed out"); } - WriteResponses write_response_variant = commit_response.GetValue(); - auto &response = std::get(write_response_variant); + msgs::WriteResponses write_response_variant = commit_response.GetValue(); + auto &response = std::get(write_response_variant); if (response.error) { throw std::runtime_error("Commit request did not succeed"); } @@ -225,17 +224,15 @@ class ShardRequestManager : public ShardRequestManagerInterface { return shards_map_.GetLabelId(name).value(); } - const std::string &PropertyToName(memgraph::storage::v3::PropertyId id) const override { + const std::string &PropertyToName(storage::v3::PropertyId id) const override { return properties_.IdToName(id.AsUint()); } - const std::string &LabelToName(memgraph::storage::v3::LabelId id) const override { - return labels_.IdToName(id.AsUint()); - } - const std::string &EdgeTypeToName(memgraph::storage::v3::EdgeTypeId id) const override { + const std::string &LabelToName(storage::v3::LabelId id) const override { return labels_.IdToName(id.AsUint()); } + const std::string &EdgeTypeToName(storage::v3::EdgeTypeId id) const override { return edge_types_.IdToName(id.AsUint()); } - bool IsPrimaryKey(LabelId primary_label, PropertyId property) const override { + bool IsPrimaryKey(storage::v3::LabelId primary_label, storage::v3::PropertyId property) const override { const auto schema_it = shards_map_.schemas.find(primary_label); MG_ASSERT(schema_it != shards_map_.schemas.end(), "Invalid primary label id: {}", primary_label.AsUint()); @@ -244,12 +241,12 @@ class ShardRequestManager : public ShardRequestManagerInterface { }) != schema_it->second.end(); } - bool IsPrimaryLabel(LabelId label) const override { return shards_map_.label_spaces.contains(label); } + bool IsPrimaryLabel(storage::v3::LabelId label) const override { return shards_map_.label_spaces.contains(label); } // TODO(kostasrim) Simplify return result - std::vector Request(ExecutionState &state) override { + std::vector Request(ExecutionState &state) override { MaybeInitializeExecutionState(state); - std::vector responses; + std::vector responses; SendAllRequests(state); auto all_requests_gathered = [](auto &paginated_rsp_tracker) { @@ -273,11 +270,11 @@ class ShardRequestManager : public ShardRequestManagerInterface { return PostProcess(std::move(responses)); } - std::vector Request(ExecutionState &state, - std::vector new_vertices) override { + std::vector Request(ExecutionState &state, + std::vector new_vertices) override { MG_ASSERT(!new_vertices.empty()); MaybeInitializeExecutionState(state, new_vertices); - std::vector responses; + std::vector responses; auto &shard_cache_ref = state.shard_cache; // 1. Send the requests. @@ -294,22 +291,22 @@ class ShardRequestManager : public ShardRequestManagerInterface { return responses; } - std::vector Request(ExecutionState &state, - std::vector new_edges) override { + std::vector Request(ExecutionState &state, + std::vector new_edges) override { MG_ASSERT(!new_edges.empty()); MaybeInitializeExecutionState(state, new_edges); - std::vector responses; + std::vector responses; auto &shard_cache_ref = state.shard_cache; size_t id{0}; for (auto shard_it = shard_cache_ref.begin(); shard_it != shard_cache_ref.end(); ++id) { auto &storage_client = GetStorageClientForShard(*shard_it); - WriteRequests req = state.requests[id]; + msgs::WriteRequests req = state.requests[id]; auto write_response_result = storage_client.SendWriteRequest(std::move(req)); if (write_response_result.HasError()) { throw std::runtime_error("CreateVertices request timedout"); } - WriteResponses response_variant = write_response_result.GetValue(); - CreateExpandResponse mapped_response = std::get(response_variant); + msgs::WriteResponses response_variant = write_response_result.GetValue(); + msgs::CreateExpandResponse mapped_response = std::get(response_variant); if (mapped_response.error) { throw std::runtime_error("CreateExpand request did not succeed"); @@ -322,14 +319,15 @@ class ShardRequestManager : public ShardRequestManagerInterface { return responses; } - std::vector Request(ExecutionState &state, ExpandOneRequest request) override { + std::vector Request(ExecutionState &state, + msgs::ExpandOneRequest request) override { // TODO(kostasrim)Update to limit the batch size here // Expansions of the destination must be handled by the caller. For example // match (u:L1 { prop : 1 })-[:Friend]-(v:L1) // For each vertex U, the ExpandOne will result in . The destination vertex and its properties // must be fetched again with an ExpandOne(Edges.dst) MaybeInitializeExecutionState(state, std::move(request)); - std::vector responses; + std::vector responses; auto &shard_cache_ref = state.shard_cache; // 1. Send the requests. @@ -339,10 +337,11 @@ class ShardRequestManager : public ShardRequestManagerInterface { do { AwaitOnResponses(state, responses); } while (!state.shard_cache.empty()); - std::vector result_rows; - const auto total_row_count = std::accumulate( - responses.begin(), responses.end(), 0, - [](const int64_t partial_count, const ExpandOneResponse &resp) { return partial_count + resp.result.size(); }); + std::vector result_rows; + const auto total_row_count = std::accumulate(responses.begin(), responses.end(), 0, + [](const int64_t partial_count, const msgs::ExpandOneResponse &resp) { + return partial_count + resp.result.size(); + }); result_rows.reserve(total_row_count); for (auto &response : responses) { @@ -356,7 +355,7 @@ class ShardRequestManager : public ShardRequestManagerInterface { private: enum class PaginatedResponseState { Pending, PartiallyFinished }; - std::vector PostProcess(std::vector &&responses) const { + std::vector PostProcess(std::vector &&responses) const { std::vector accessors; for (auto &response : responses) { for (auto &result_row : response.results) { @@ -385,22 +384,22 @@ class ShardRequestManager : public ShardRequestManagerInterface { return state.state != ExecutionState::INITIALIZING; } - void MaybeInitializeExecutionState(ExecutionState &state, - std::vector new_vertices) { + void MaybeInitializeExecutionState(ExecutionState &state, + std::vector new_vertices) { ThrowIfStateCompleted(state); if (ShallNotInitializeState(state)) { return; } state.transaction_id = transaction_id_; - std::map per_shard_request_table; + std::map per_shard_request_table; for (auto &new_vertex : new_vertices) { MG_ASSERT(!new_vertex.label_ids.empty(), "This is error!"); auto shard = shards_map_.GetShardForKey(new_vertex.label_ids[0].id, storage::conversions::ConvertPropertyVector(new_vertex.primary_key)); if (!per_shard_request_table.contains(shard)) { - CreateVerticesRequest create_v_rqst{.transaction_id = transaction_id_}; + msgs::CreateVerticesRequest create_v_rqst{.transaction_id = transaction_id_}; per_shard_request_table.insert(std::pair(shard, std::move(create_v_rqst))); state.shard_cache.push_back(shard); } @@ -410,21 +409,22 @@ class ShardRequestManager : public ShardRequestManagerInterface { for (auto &[shard, rqst] : per_shard_request_table) { state.requests.push_back(std::move(rqst)); } - state.state = ExecutionState::EXECUTING; + state.state = ExecutionState::EXECUTING; } - void MaybeInitializeExecutionState(ExecutionState &state, std::vector new_expands) { + void MaybeInitializeExecutionState(ExecutionState &state, + std::vector new_expands) { ThrowIfStateCompleted(state); if (ShallNotInitializeState(state)) { return; } state.transaction_id = transaction_id_; - std::map per_shard_request_table; + std::map per_shard_request_table; auto ensure_shard_exists_in_table = [&per_shard_request_table, transaction_id = transaction_id_](const Shard &shard) { if (!per_shard_request_table.contains(shard)) { - CreateExpandRequest create_expand_request{.transaction_id = transaction_id}; + msgs::CreateExpandRequest create_expand_request{.transaction_id = transaction_id}; per_shard_request_table.insert({shard, std::move(create_expand_request)}); } }; @@ -448,10 +448,10 @@ class ShardRequestManager : public ShardRequestManagerInterface { state.shard_cache.push_back(shard); state.requests.push_back(std::move(request)); } - state.state = ExecutionState::EXECUTING; + state.state = ExecutionState::EXECUTING; } - void MaybeInitializeExecutionState(ExecutionState &state) { + void MaybeInitializeExecutionState(ExecutionState &state) { ThrowIfStateCompleted(state); if (ShallNotInitializeState(state)) { return; @@ -471,23 +471,23 @@ class ShardRequestManager : public ShardRequestManagerInterface { for (auto &[key, shard] : shards) { MG_ASSERT(!shard.empty()); state.shard_cache.push_back(std::move(shard)); - ScanVerticesRequest rqst; + msgs::ScanVerticesRequest rqst; rqst.transaction_id = transaction_id_; rqst.start_id.second = storage::conversions::ConvertValueVector(key); state.requests.push_back(std::move(rqst)); } } - state.state = ExecutionState::EXECUTING; + state.state = ExecutionState::EXECUTING; } - void MaybeInitializeExecutionState(ExecutionState &state, ExpandOneRequest request) { + void MaybeInitializeExecutionState(ExecutionState &state, msgs::ExpandOneRequest request) { ThrowIfStateCompleted(state); if (ShallNotInitializeState(state)) { return; } state.transaction_id = transaction_id_; - std::map per_shard_request_table; + std::map per_shard_request_table; auto top_level_rqst_template = request; top_level_rqst_template.transaction_id = transaction_id_; top_level_rqst_template.src_vertices.clear(); @@ -505,7 +505,7 @@ class ShardRequestManager : public ShardRequestManagerInterface { for (auto &[shard, rqst] : per_shard_request_table) { state.requests.push_back(std::move(rqst)); } - state.state = ExecutionState::EXECUTING; + state.state = ExecutionState::EXECUTING; } StorageClient &GetStorageClientForShard(Shard shard) { @@ -532,20 +532,20 @@ class ShardRequestManager : public ShardRequestManagerInterface { storage_cli_manager_.AddClient(target_shard, std::move(cli)); } - void SendAllRequests(ExecutionState &state) { + void SendAllRequests(ExecutionState &state) { int64_t shard_idx = 0; for (const auto &request : state.requests) { const auto ¤t_shard = state.shard_cache[shard_idx]; auto &storage_client = GetStorageClientForShard(current_shard); - ReadRequests req = request; + msgs::ReadRequests req = request; storage_client.SendAsyncReadRequest(request); ++shard_idx; } } - void SendAllRequests(ExecutionState &state, + void SendAllRequests(ExecutionState &state, std::vector &shard_cache_ref) { size_t id = 0; for (auto shard_it = shard_cache_ref.begin(); shard_it != shard_cache_ref.end(); ++shard_it) { @@ -559,24 +559,25 @@ class ShardRequestManager : public ShardRequestManagerInterface { auto &storage_client = GetStorageClientForShard(*shard_it); - WriteRequests req = req_deep_copy; + msgs::WriteRequests req = req_deep_copy; storage_client.SendAsyncWriteRequest(req); ++id; } } - void SendAllRequests(ExecutionState &state, + void SendAllRequests(ExecutionState &state, std::vector &shard_cache_ref) { size_t id = 0; for (auto shard_it = shard_cache_ref.begin(); shard_it != shard_cache_ref.end(); ++shard_it) { auto &storage_client = GetStorageClientForShard(*shard_it); - ReadRequests req = state.requests[id]; + msgs::ReadRequests req = state.requests[id]; storage_client.SendAsyncReadRequest(req); ++id; } } - void AwaitOnResponses(ExecutionState &state, std::vector &responses) { + void AwaitOnResponses(ExecutionState &state, + std::vector &responses) { auto &shard_cache_ref = state.shard_cache; int64_t request_idx = 0; @@ -598,8 +599,8 @@ class ShardRequestManager : public ShardRequestManagerInterface { throw std::runtime_error("CreateVertices request timed out"); } - WriteResponses response_variant = poll_result->GetValue(); - auto response = std::get(response_variant); + msgs::WriteResponses response_variant = poll_result->GetValue(); + auto response = std::get(response_variant); if (response.error) { throw std::runtime_error("CreateVertices request did not succeed"); @@ -613,7 +614,8 @@ class ShardRequestManager : public ShardRequestManagerInterface { } } - void AwaitOnResponses(ExecutionState &state, std::vector &responses) { + void AwaitOnResponses(ExecutionState &state, + std::vector &responses) { auto &shard_cache_ref = state.shard_cache; int64_t request_idx = 0; @@ -631,8 +633,8 @@ class ShardRequestManager : public ShardRequestManagerInterface { throw std::runtime_error("ExpandOne request timed out"); } - ReadResponses response_variant = poll_result->GetValue(); - auto response = std::get(response_variant); + msgs::ReadResponses response_variant = poll_result->GetValue(); + auto response = std::get(response_variant); // -NOTE- // Currently a boolean flag for signaling the overall success of the // ExpandOne request does not exist. But it should, so here we assume @@ -649,8 +651,8 @@ class ShardRequestManager : public ShardRequestManagerInterface { } } - void AwaitOnPaginatedRequests(ExecutionState &state, - std::vector &responses, + void AwaitOnPaginatedRequests(ExecutionState &state, + std::vector &responses, std::map &paginated_response_tracker) { auto &shard_cache_ref = state.shard_cache; @@ -678,8 +680,8 @@ class ShardRequestManager : public ShardRequestManagerInterface { throw std::runtime_error("ScanAll request timed out"); } - ReadResponses read_response_variant = await_result->GetValue(); - auto response = std::get(read_response_variant); + msgs::ReadResponses read_response_variant = await_result->GetValue(); + auto response = std::get(read_response_variant); if (response.error) { throw std::runtime_error("ScanAll request did not succeed"); } @@ -723,8 +725,8 @@ class ShardRequestManager : public ShardRequestManagerInterface { storage::v3::NameIdMapper labels_; CoordinatorClient coord_cli_; RsmStorageClientManager storage_cli_manager_; - memgraph::io::Io io_; - memgraph::coordinator::Hlc transaction_id_; + io::Io io_; + coordinator::Hlc transaction_id_; // TODO(kostasrim) Add batch prefetching }; -} // namespace memgraph::msgs +} // namespace memgraph::query::v2 diff --git a/tests/simulation/shard_request_manager.cpp b/tests/simulation/shard_request_manager.cpp index 746ab385f..9db18659e 100644 --- a/tests/simulation/shard_request_manager.cpp +++ b/tests/simulation/shard_request_manager.cpp @@ -151,7 +151,7 @@ void RunStorageRaft(Raft state{.label = "test_label"}; auto result = io.Request(state); @@ -171,7 +171,7 @@ void TestScanVertices(msgs::ShardRequestManagerInterface &io) { } } -void TestCreateVertices(msgs::ShardRequestManagerInterface &io) { +void TestCreateVertices(query::v2::ShardRequestManagerInterface &io) { using PropVal = msgs::Value; msgs::ExecutionState state; std::vector new_vertices; @@ -187,7 +187,7 @@ void TestCreateVertices(msgs::ShardRequestManagerInterface &io) { MG_ASSERT(result.size() == 2); } -void TestCreateExpand(msgs::ShardRequestManagerInterface &io) { +void TestCreateExpand(query::v2::ShardRequestManagerInterface &io) { using PropVal = msgs::Value; msgs::ExecutionState state; std::vector new_expands; @@ -209,7 +209,7 @@ void TestCreateExpand(msgs::ShardRequestManagerInterface &io) { MG_ASSERT(responses[1].success); } -void TestExpandOne(msgs::ShardRequestManagerInterface &shard_request_manager) { +void TestExpandOne(query::v2::ShardRequestManagerInterface &shard_request_manager) { msgs::ExecutionState state{}; msgs::ExpandOneRequest request; const auto edge_type_id = shard_request_manager.NameToEdgeType("edge_type"); @@ -337,7 +337,7 @@ void DoTest() { // also get the current shard map CoordinatorClient coordinator_client(cli_io, c_addrs[0], c_addrs); - msgs::ShardRequestManager io(std::move(coordinator_client), std::move(cli_io)); + query::v2::ShardRequestManager io(std::move(coordinator_client), std::move(cli_io)); io.StartTransaction(); TestScanVertices(io); diff --git a/tests/simulation/test_cluster.hpp b/tests/simulation/test_cluster.hpp index 096009b7a..fd62b99f9 100644 --- a/tests/simulation/test_cluster.hpp +++ b/tests/simulation/test_cluster.hpp @@ -151,7 +151,7 @@ ShardMap TestShardMap(int n_splits, int replication_factor) { return sm; } -void ExecuteOp(msgs::ShardRequestManager &shard_request_manager, +void ExecuteOp(query::v2::ShardRequestManager &shard_request_manager, std::set &correctness_model, CreateVertex create_vertex) { const auto key1 = memgraph::storage::v3::PropertyValue(create_vertex.first); const auto key2 = memgraph::storage::v3::PropertyValue(create_vertex.second); @@ -164,7 +164,7 @@ void ExecuteOp(msgs::ShardRequestManager &shard_request_mana return; } - msgs::ExecutionState state; + query::v2::ExecutionState state; auto label_id = shard_request_manager.NameToLabel("test_label"); @@ -182,9 +182,9 @@ void ExecuteOp(msgs::ShardRequestManager &shard_request_mana correctness_model.emplace(std::make_pair(create_vertex.first, create_vertex.second)); } -void ExecuteOp(msgs::ShardRequestManager &shard_request_manager, +void ExecuteOp(query::v2::ShardRequestManager &shard_request_manager, std::set &correctness_model, ScanAll scan_all) { - msgs::ExecutionState request{.label = "test_label"}; + query::v2::ExecutionState request{.label = "test_label"}; auto results = shard_request_manager.Request(request); @@ -247,7 +247,8 @@ std::pair RunClusterSimulation(const CoordinatorClient coordinator_client(cli_io, coordinator_address, {coordinator_address}); WaitForShardsToInitialize(coordinator_client); - msgs::ShardRequestManager shard_request_manager(std::move(coordinator_client), std::move(cli_io)); + query::v2::ShardRequestManager shard_request_manager(std::move(coordinator_client), + std::move(cli_io)); shard_request_manager.StartTransaction(); diff --git a/tests/unit/high_density_shard_create_scan.cpp b/tests/unit/high_density_shard_create_scan.cpp index a0c0a0c28..e586a3556 100644 --- a/tests/unit/high_density_shard_create_scan.cpp +++ b/tests/unit/high_density_shard_create_scan.cpp @@ -161,7 +161,7 @@ ShardMap TestShardMap(int shards, int replication_factor, int gap_between_shards return sm; } -void ExecuteOp(msgs::ShardRequestManager &shard_request_manager, +void ExecuteOp(query::v2::ShardRequestManager &shard_request_manager, std::set &correctness_model, CreateVertex create_vertex) { const auto key1 = memgraph::storage::v3::PropertyValue(create_vertex.first); const auto key2 = memgraph::storage::v3::PropertyValue(create_vertex.second); @@ -174,7 +174,7 @@ void ExecuteOp(msgs::ShardRequestManager &shard_request_manager, return; } - msgs::ExecutionState state; + query::v2::ExecutionState state; auto label_id = shard_request_manager.NameToLabel("test_label"); @@ -192,9 +192,9 @@ void ExecuteOp(msgs::ShardRequestManager &shard_request_manager, correctness_model.emplace(std::make_pair(create_vertex.first, create_vertex.second)); } -void ExecuteOp(msgs::ShardRequestManager &shard_request_manager, +void ExecuteOp(query::v2::ShardRequestManager &shard_request_manager, std::set &correctness_model, ScanAll scan_all) { - msgs::ExecutionState request{.label = "test_label"}; + query::v2::ExecutionState request{.label = "test_label"}; auto results = shard_request_manager.Request(request); @@ -245,7 +245,8 @@ void RunWorkload(int shards, int replication_factor, int create_ops, int scan_op WaitForShardsToInitialize(coordinator_client); auto time_after_shard_stabilization = cli_io_2.Now(); - msgs::ShardRequestManager shard_request_manager(std::move(coordinator_client), std::move(cli_io)); + query::v2::ShardRequestManager shard_request_manager(std::move(coordinator_client), + std::move(cli_io)); shard_request_manager.StartTransaction(); diff --git a/tests/unit/machine_manager.cpp b/tests/unit/machine_manager.cpp index 110220eda..834523fee 100644 --- a/tests/unit/machine_manager.cpp +++ b/tests/unit/machine_manager.cpp @@ -111,15 +111,15 @@ ShardMap TestShardMap() { template void TestScanAll(ShardRequestManager &shard_request_manager) { - msgs::ExecutionState state{.label = kLabelName}; + query::v2::ExecutionState state{.label = kLabelName}; auto result = shard_request_manager.Request(state); EXPECT_EQ(result.size(), 2); } -void TestCreateVertices(msgs::ShardRequestManagerInterface &shard_request_manager) { +void TestCreateVertices(query::v2::ShardRequestManagerInterface &shard_request_manager) { using PropVal = msgs::Value; - msgs::ExecutionState state; + query::v2::ExecutionState state; std::vector new_vertices; auto label_id = shard_request_manager.NameToLabel(kLabelName); msgs::NewVertex a1{.primary_key = {PropVal(int64_t(0)), PropVal(int64_t(0))}}; @@ -134,9 +134,9 @@ void TestCreateVertices(msgs::ShardRequestManagerInterface &shard_request_manage EXPECT_FALSE(result[0].error.has_value()) << result[0].error->message; } -void TestCreateExpand(msgs::ShardRequestManagerInterface &shard_request_manager) { +void TestCreateExpand(query::v2::ShardRequestManagerInterface &shard_request_manager) { using PropVal = msgs::Value; - msgs::ExecutionState state; + query::v2::ExecutionState state; std::vector new_expands; const auto edge_type_id = shard_request_manager.NameToEdgeType("edge_type"); @@ -155,8 +155,8 @@ void TestCreateExpand(msgs::ShardRequestManagerInterface &shard_request_manager) MG_ASSERT(!responses[0].error.has_value()); } -void TestExpandOne(msgs::ShardRequestManagerInterface &shard_request_manager) { - msgs::ExecutionState state{}; +void TestExpandOne(query::v2::ShardRequestManagerInterface &shard_request_manager) { + query::v2::ExecutionState state{}; msgs::ExpandOneRequest request; const auto edge_type_id = shard_request_manager.NameToEdgeType("edge_type"); const auto label = msgs::Label{shard_request_manager.NameToLabel("test_label")}; @@ -226,7 +226,8 @@ TEST(MachineManager, BasicFunctionality) { CoordinatorClient coordinator_client(cli_io, coordinator_address, {coordinator_address}); - msgs::ShardRequestManager shard_request_manager(std::move(coordinator_client), std::move(cli_io)); + query::v2::ShardRequestManager shard_request_manager(std::move(coordinator_client), + std::move(cli_io)); shard_request_manager.StartTransaction(); TestCreateVertices(shard_request_manager);