Merge pull request #670 from memgraph/tyler_set_proper_namespace_for_ShardRequestManager

Change the namespace of ShardRequestManager to query::v2 instead of msgs
This commit is contained in:
Tyler Neely 2022-11-28 11:31:09 +01:00 committed by GitHub
commit f4d0c7769e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 225 additions and 223 deletions

View File

@ -72,7 +72,7 @@ query::v2::TypedValue ToTypedValue(const Value &value) {
}
communication::bolt::Vertex ToBoltVertex(const query::v2::accessors::VertexAccessor &vertex,
const msgs::ShardRequestManagerInterface *shard_request_manager,
const query::v2::ShardRequestManagerInterface *shard_request_manager,
storage::v3::View /*view*/) {
auto id = communication::bolt::Id::FromUint(0);
@ -92,7 +92,7 @@ communication::bolt::Vertex ToBoltVertex(const query::v2::accessors::VertexAcces
}
communication::bolt::Edge ToBoltEdge(const query::v2::accessors::EdgeAccessor &edge,
const msgs::ShardRequestManagerInterface *shard_request_manager,
const query::v2::ShardRequestManagerInterface *shard_request_manager,
storage::v3::View /*view*/) {
// TODO(jbajic) Fix bolt communication
auto id = communication::bolt::Id::FromUint(0);
@ -109,15 +109,15 @@ communication::bolt::Edge ToBoltEdge(const query::v2::accessors::EdgeAccessor &e
}
communication::bolt::Path ToBoltPath(const query::v2::accessors::Path & /*edge*/,
const msgs::ShardRequestManagerInterface * /*shard_request_manager*/,
const query::v2::ShardRequestManagerInterface * /*shard_request_manager*/,
storage::v3::View /*view*/) {
// TODO(jbajic) Fix bolt communication
MG_ASSERT(false, "Path is unimplemented!");
return {};
}
Value ToBoltValue(const query::v2::TypedValue &value, const msgs::ShardRequestManagerInterface *shard_request_manager,
storage::v3::View view) {
Value ToBoltValue(const query::v2::TypedValue &value,
const query::v2::ShardRequestManagerInterface *shard_request_manager, storage::v3::View view) {
switch (value.type()) {
case query::v2::TypedValue::Type::Null:
return {};

View File

@ -32,39 +32,39 @@ namespace memgraph::glue::v2 {
/// @param storage::v3::VertexAccessor for converting to
/// communication::bolt::Vertex.
/// @param msgs::ShardRequestManagerInterface *shard_request_manager getting label and property names.
/// @param query::v2::ShardRequestManagerInterface *shard_request_manager getting label and property names.
/// @param storage::v3::View for deciding which vertex attributes are visible.
///
/// @throw std::bad_alloc
communication::bolt::Vertex ToBoltVertex(const storage::v3::VertexAccessor &vertex,
const msgs::ShardRequestManagerInterface *shard_request_manager,
const query::v2::ShardRequestManagerInterface *shard_request_manager,
storage::v3::View view);
/// @param storage::v3::EdgeAccessor for converting to communication::bolt::Edge.
/// @param msgs::ShardRequestManagerInterface *shard_request_manager getting edge type and property names.
/// @param query::v2::ShardRequestManagerInterface *shard_request_manager getting edge type and property names.
/// @param storage::v3::View for deciding which edge attributes are visible.
///
/// @throw std::bad_alloc
communication::bolt::Edge ToBoltEdge(const storage::v3::EdgeAccessor &edge,
const msgs::ShardRequestManagerInterface *shard_request_manager,
const query::v2::ShardRequestManagerInterface *shard_request_manager,
storage::v3::View view);
/// @param query::v2::Path for converting to communication::bolt::Path.
/// @param msgs::ShardRequestManagerInterface *shard_request_manager ToBoltVertex and ToBoltEdge.
/// @param query::v2::ShardRequestManagerInterface *shard_request_manager ToBoltVertex and ToBoltEdge.
/// @param storage::v3::View for ToBoltVertex and ToBoltEdge.
///
/// @throw std::bad_alloc
communication::bolt::Path ToBoltPath(const query::v2::accessors::Path &path,
const msgs::ShardRequestManagerInterface *shard_request_manager,
const query::v2::ShardRequestManagerInterface *shard_request_manager,
storage::v3::View view);
/// @param query::v2::TypedValue for converting to communication::bolt::Value.
/// @param msgs::ShardRequestManagerInterface *shard_request_manager ToBoltVertex and ToBoltEdge.
/// @param query::v2::ShardRequestManagerInterface *shard_request_manager ToBoltVertex and ToBoltEdge.
/// @param storage::v3::View for ToBoltVertex and ToBoltEdge.
///
/// @throw std::bad_alloc
communication::bolt::Value ToBoltValue(const query::v2::TypedValue &value,
const msgs::ShardRequestManagerInterface *shard_request_manager,
const query::v2::ShardRequestManagerInterface *shard_request_manager,
storage::v3::View view);
query::v2::TypedValue ToTypedValue(const communication::bolt::Value &value);
@ -76,7 +76,7 @@ storage::v3::PropertyValue ToPropertyValue(const communication::bolt::Value &val
communication::bolt::Value ToBoltValue(msgs::Value value);
communication::bolt::Value ToBoltValue(msgs::Value value,
const msgs::ShardRequestManagerInterface *shard_request_manager,
const query::v2::ShardRequestManagerInterface *shard_request_manager,
storage::v3::View view);
} // namespace memgraph::glue::v2

View File

@ -497,7 +497,8 @@ class BoltSession final : public memgraph::communication::bolt::Session<memgraph
/// before forwarding the calls to original TEncoder.
class TypedValueResultStream {
public:
TypedValueResultStream(TEncoder *encoder, const memgraph::msgs::ShardRequestManagerInterface *shard_request_manager)
TypedValueResultStream(TEncoder *encoder,
const memgraph::query::v2::ShardRequestManagerInterface *shard_request_manager)
: encoder_(encoder), shard_request_manager_(shard_request_manager) {}
void Result(const std::vector<memgraph::query::v2::TypedValue> &values) {
@ -512,7 +513,7 @@ class BoltSession final : public memgraph::communication::bolt::Session<memgraph
private:
TEncoder *encoder_;
const memgraph::msgs::ShardRequestManagerInterface *shard_request_manager_{nullptr};
const memgraph::query::v2::ShardRequestManagerInterface *shard_request_manager_{nullptr};
};
memgraph::query::v2::Interpreter interpreter_;
memgraph::communication::v2::ServerEndpoint endpoint_;

View File

@ -15,7 +15,7 @@
#include "storage/v3/id_types.hpp"
namespace memgraph::query::v2::accessors {
EdgeAccessor::EdgeAccessor(Edge edge, const msgs::ShardRequestManagerInterface *manager)
EdgeAccessor::EdgeAccessor(Edge edge, const ShardRequestManagerInterface *manager)
: edge(std::move(edge)), manager_(manager) {}
EdgeTypeId EdgeAccessor::EdgeType() const { return edge.type.id; }
@ -44,11 +44,11 @@ VertexAccessor EdgeAccessor::From() const {
}
VertexAccessor::VertexAccessor(Vertex v, std::vector<std::pair<PropertyId, Value>> props,
const msgs::ShardRequestManagerInterface *manager)
const ShardRequestManagerInterface *manager)
: vertex(std::move(v)), properties(std::move(props)), manager_(manager) {}
VertexAccessor::VertexAccessor(Vertex v, std::map<PropertyId, Value> &&props,
const msgs::ShardRequestManagerInterface *manager)
const ShardRequestManagerInterface *manager)
: vertex(std::move(v)), manager_(manager) {
properties.reserve(props.size());
for (auto &[id, value] : props) {
@ -57,7 +57,7 @@ VertexAccessor::VertexAccessor(Vertex v, std::map<PropertyId, Value> &&props,
}
VertexAccessor::VertexAccessor(Vertex v, const std::map<PropertyId, Value> &props,
const msgs::ShardRequestManagerInterface *manager)
const ShardRequestManagerInterface *manager)
: vertex(std::move(v)), manager_(manager) {
properties.reserve(props.size());
for (const auto &[id, value] : props) {

View File

@ -24,24 +24,24 @@
#include "utils/memory.hpp"
#include "utils/memory_tracker.hpp"
namespace memgraph::msgs {
namespace memgraph::query::v2 {
class ShardRequestManagerInterface;
} // namespace memgraph::msgs
} // namespace memgraph::query::v2
namespace memgraph::query::v2::accessors {
using Value = memgraph::msgs::Value;
using Edge = memgraph::msgs::Edge;
using Vertex = memgraph::msgs::Vertex;
using Label = memgraph::msgs::Label;
using PropertyId = memgraph::msgs::PropertyId;
using EdgeTypeId = memgraph::msgs::EdgeTypeId;
using Value = msgs::Value;
using Edge = msgs::Edge;
using Vertex = msgs::Vertex;
using Label = msgs::Label;
using PropertyId = msgs::PropertyId;
using EdgeTypeId = msgs::EdgeTypeId;
class VertexAccessor;
class EdgeAccessor final {
public:
explicit EdgeAccessor(Edge edge, const msgs::ShardRequestManagerInterface *manager);
explicit EdgeAccessor(Edge edge, const ShardRequestManagerInterface *manager);
[[nodiscard]] EdgeTypeId EdgeType() const;
@ -69,7 +69,7 @@ class EdgeAccessor final {
private:
Edge edge;
const msgs::ShardRequestManagerInterface *manager_;
const ShardRequestManagerInterface *manager_;
};
class VertexAccessor final {
@ -78,10 +78,10 @@ class VertexAccessor final {
using Label = msgs::Label;
using VertexId = msgs::VertexId;
VertexAccessor(Vertex v, std::vector<std::pair<PropertyId, Value>> props,
const msgs::ShardRequestManagerInterface *manager);
const ShardRequestManagerInterface *manager);
VertexAccessor(Vertex v, std::map<PropertyId, Value> &&props, const msgs::ShardRequestManagerInterface *manager);
VertexAccessor(Vertex v, const std::map<PropertyId, Value> &props, const msgs::ShardRequestManagerInterface *manager);
VertexAccessor(Vertex v, std::map<PropertyId, Value> &&props, const ShardRequestManagerInterface *manager);
VertexAccessor(Vertex v, const std::map<PropertyId, Value> &props, const ShardRequestManagerInterface *manager);
[[nodiscard]] Label PrimaryLabel() const;
@ -150,7 +150,7 @@ class VertexAccessor final {
private:
Vertex vertex;
std::vector<std::pair<PropertyId, Value>> properties;
const msgs::ShardRequestManagerInterface *manager_;
const ShardRequestManagerInterface *manager_;
};
// inline VertexAccessor EdgeAccessor::To() const { return VertexAccessor(impl_.ToVertex()); }

View File

@ -24,27 +24,26 @@
#include "storage/v3/result.hpp"
#include "storage/v3/view.hpp"
namespace memgraph::msgs {
class ShardRequestManagerInterface;
} // namespace memgraph::msgs
namespace memgraph::query::v2 {
class ShardRequestManagerInterface;
inline const auto lam = [](const auto &val) { return ValueToTypedValue(val); };
namespace detail {
class Callable {
public:
auto operator()(const memgraph::storage::v3::PropertyValue &val) const {
return memgraph::storage::v3::PropertyToTypedValue<TypedValue>(val);
auto operator()(const storage::v3::PropertyValue &val) const {
return storage::v3::PropertyToTypedValue<TypedValue>(val);
};
auto operator()(const msgs::Value &val, memgraph::msgs::ShardRequestManagerInterface *manager) const {
auto operator()(const msgs::Value &val, ShardRequestManagerInterface *manager) const {
return ValueToTypedValue(val, manager);
};
};
} // namespace detail
using ExpressionEvaluator = memgraph::expr::ExpressionEvaluator<
TypedValue, memgraph::query::v2::EvaluationContext, memgraph::msgs::ShardRequestManagerInterface, storage::v3::View,
storage::v3::LabelId, msgs::Value, detail::Callable, common::ErrorCode, memgraph::expr::QueryEngineTag>;
using ExpressionEvaluator =
expr::ExpressionEvaluator<TypedValue, query::v2::EvaluationContext, ShardRequestManagerInterface, storage::v3::View,
storage::v3::LabelId, msgs::Value, detail::Callable, common::ErrorCode,
expr::QueryEngineTag>;
} // namespace memgraph::query::v2

View File

@ -60,8 +60,8 @@ struct EvaluationContext {
mutable std::unordered_map<std::string, int64_t> counters;
};
inline std::vector<storage::v3::PropertyId> NamesToProperties(
const std::vector<std::string> &property_names, msgs::ShardRequestManagerInterface *shard_request_manager) {
inline std::vector<storage::v3::PropertyId> NamesToProperties(const std::vector<std::string> &property_names,
ShardRequestManagerInterface *shard_request_manager) {
std::vector<storage::v3::PropertyId> properties;
// TODO Fix by using reference
properties.reserve(property_names.size());
@ -74,7 +74,7 @@ inline std::vector<storage::v3::PropertyId> NamesToProperties(
}
inline std::vector<storage::v3::LabelId> NamesToLabels(const std::vector<std::string> &label_names,
msgs::ShardRequestManagerInterface *shard_request_manager) {
ShardRequestManagerInterface *shard_request_manager) {
std::vector<storage::v3::LabelId> labels;
labels.reserve(label_names.size());
// TODO Fix by using reference
@ -97,7 +97,7 @@ struct ExecutionContext {
plan::ProfilingStats *stats_root{nullptr};
ExecutionStats execution_stats;
utils::AsyncTimer timer;
msgs::ShardRequestManagerInterface *shard_request_manager{nullptr};
ShardRequestManagerInterface *shard_request_manager{nullptr};
IdAllocator *edge_ids_alloc;
};

View File

@ -17,7 +17,7 @@
namespace memgraph::query::v2 {
inline TypedValue ValueToTypedValue(const msgs::Value &value, msgs::ShardRequestManagerInterface *manager) {
inline TypedValue ValueToTypedValue(const msgs::Value &value, ShardRequestManagerInterface *manager) {
using Value = msgs::Value;
switch (value.type) {
case Value::Type::Null:

View File

@ -118,7 +118,7 @@ ParsedQuery ParseQuery(const std::string &query_string, const std::map<std::stri
}
std::unique_ptr<LogicalPlan> MakeLogicalPlan(AstStorage ast_storage, CypherQuery *query, const Parameters &parameters,
msgs::ShardRequestManagerInterface *shard_manager,
ShardRequestManagerInterface *shard_manager,
const std::vector<Identifier *> &predefined_identifiers) {
auto vertex_counts = plan::MakeVertexCountCache(shard_manager);
auto symbol_table = expr::MakeSymbolTable(query, predefined_identifiers);
@ -130,7 +130,7 @@ std::unique_ptr<LogicalPlan> MakeLogicalPlan(AstStorage ast_storage, CypherQuery
std::shared_ptr<CachedPlan> CypherQueryToPlan(uint64_t hash, AstStorage ast_storage, CypherQuery *query,
const Parameters &parameters, utils::SkipList<PlanCacheEntry> *plan_cache,
msgs::ShardRequestManagerInterface *shard_manager,
ShardRequestManagerInterface *shard_manager,
const std::vector<Identifier *> &predefined_identifiers) {
std::optional<utils::SkipList<PlanCacheEntry>::Accessor> plan_cache_access;
if (plan_cache) {

View File

@ -132,7 +132,7 @@ class SingleNodeLogicalPlan final : public LogicalPlan {
};
std::unique_ptr<LogicalPlan> MakeLogicalPlan(AstStorage ast_storage, CypherQuery *query, const Parameters &parameters,
msgs::ShardRequestManagerInterface *shard_manager,
ShardRequestManagerInterface *shard_manager,
const std::vector<Identifier *> &predefined_identifiers);
/**
@ -145,7 +145,7 @@ std::unique_ptr<LogicalPlan> MakeLogicalPlan(AstStorage ast_storage, CypherQuery
*/
std::shared_ptr<CachedPlan> CypherQueryToPlan(uint64_t hash, AstStorage ast_storage, CypherQuery *query,
const Parameters &parameters, utils::SkipList<PlanCacheEntry> *plan_cache,
msgs::ShardRequestManagerInterface *shard_manager,
ShardRequestManagerInterface *shard_manager,
const std::vector<Identifier *> &predefined_identifiers = {});
} // namespace memgraph::query::v2

View File

@ -20,12 +20,10 @@
#include "storage/v3/view.hpp"
#include "utils/memory.hpp"
namespace memgraph::msgs {
class ShardRequestManagerInterface;
} // namespace memgraph::msgs
namespace memgraph::query::v2 {
class ShardRequestManagerInterface;
namespace {
const char kStartsWith[] = "STARTSWITH";
const char kEndsWith[] = "ENDSWITH";
@ -36,7 +34,7 @@ const char kId[] = "ID";
struct FunctionContext {
// TODO(kostasrim) consider optional here. ShardRequestManager does not exist on the storage.
// DbAccessor *db_accessor;
msgs::ShardRequestManagerInterface *manager;
ShardRequestManagerInterface *manager;
utils::MemoryResource *memory;
int64_t timestamp;
std::unordered_map<std::string, int64_t> *counters;

View File

@ -143,7 +143,7 @@ class ReplQueryHandler final : public query::v2::ReplicationQueryHandler {
/// @throw QueryRuntimeException if an error ocurred.
Callback HandleAuthQuery(AuthQuery *auth_query, AuthQueryHandler *auth, const Parameters &parameters,
msgs::ShardRequestManagerInterface *manager) {
ShardRequestManagerInterface *manager) {
// Empty frame for evaluation of password expression. This is OK since
// password should be either null or string literal and it's evaluation
// should not depend on frame.
@ -312,7 +312,7 @@ Callback HandleAuthQuery(AuthQuery *auth_query, AuthQueryHandler *auth, const Pa
}
Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &parameters,
InterpreterContext *interpreter_context, msgs::ShardRequestManagerInterface *manager,
InterpreterContext *interpreter_context, ShardRequestManagerInterface *manager,
std::vector<Notification> *notifications) {
expr::Frame<TypedValue> frame(0);
SymbolTable symbol_table;
@ -448,7 +448,7 @@ Callback HandleReplicationQuery(ReplicationQuery *repl_query, const Parameters &
}
Callback HandleSettingQuery(SettingQuery *setting_query, const Parameters &parameters,
msgs::ShardRequestManagerInterface *manager) {
ShardRequestManagerInterface *manager) {
expr::Frame<TypedValue> frame(0);
SymbolTable symbol_table;
EvaluationContext evaluation_context;
@ -649,7 +649,7 @@ struct PullPlanVector {
struct PullPlan {
explicit PullPlan(std::shared_ptr<CachedPlan> plan, const Parameters &parameters, bool is_profile_query,
DbAccessor *dba, InterpreterContext *interpreter_context, utils::MemoryResource *execution_memory,
msgs::ShardRequestManagerInterface *shard_request_manager = nullptr,
ShardRequestManagerInterface *shard_request_manager = nullptr,
// TriggerContextCollector *trigger_context_collector = nullptr,
std::optional<size_t> memory_limit = {});
std::optional<plan::ProfilingStatsWithTotalTime> Pull(AnyStream *stream, std::optional<int> n,
@ -679,7 +679,7 @@ struct PullPlan {
PullPlan::PullPlan(const std::shared_ptr<CachedPlan> plan, const Parameters &parameters, const bool is_profile_query,
DbAccessor *dba, InterpreterContext *interpreter_context, utils::MemoryResource *execution_memory,
msgs::ShardRequestManagerInterface *shard_request_manager, const std::optional<size_t> memory_limit)
ShardRequestManagerInterface *shard_request_manager, const std::optional<size_t> memory_limit)
: plan_(plan),
cursor_(plan->plan().MakeCursor(execution_memory)),
frame_(plan->symbol_table().max_position(), execution_memory),
@ -804,7 +804,7 @@ Interpreter::Interpreter(InterpreterContext *interpreter_context) : interpreter_
auto random_uuid = boost::uuids::uuid{boost::uuids::random_generator()()};
auto query_io = interpreter_context_->io.ForkLocal(random_uuid);
shard_request_manager_ = std::make_unique<msgs::ShardRequestManager<io::local_transport::LocalTransport>>(
shard_request_manager_ = std::make_unique<ShardRequestManager<io::local_transport::LocalTransport>>(
coordinator::CoordinatorClient<io::local_transport::LocalTransport>(
query_io, interpreter_context_->coordinator_address, std::vector{interpreter_context_->coordinator_address}),
std::move(query_io));
@ -881,7 +881,7 @@ PreparedQuery Interpreter::PrepareTransactionQuery(std::string_view query_upper)
PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::map<std::string, TypedValue> *summary,
InterpreterContext *interpreter_context, DbAccessor *dba,
utils::MemoryResource *execution_memory, std::vector<Notification> *notifications,
msgs::ShardRequestManagerInterface *shard_request_manager) {
ShardRequestManagerInterface *shard_request_manager) {
// TriggerContextCollector *trigger_context_collector = nullptr) {
auto *cypher_query = utils::Downcast<CypherQuery>(parsed_query.query);
@ -942,7 +942,7 @@ PreparedQuery PrepareCypherQuery(ParsedQuery parsed_query, std::map<std::string,
PreparedQuery PrepareExplainQuery(ParsedQuery parsed_query, std::map<std::string, TypedValue> *summary,
InterpreterContext *interpreter_context,
msgs::ShardRequestManagerInterface *shard_request_manager,
ShardRequestManagerInterface *shard_request_manager,
utils::MemoryResource *execution_memory) {
const std::string kExplainQueryStart = "explain ";
MG_ASSERT(utils::StartsWith(utils::ToLowerCase(parsed_query.stripped_query.query()), kExplainQueryStart),
@ -991,7 +991,7 @@ PreparedQuery PrepareExplainQuery(ParsedQuery parsed_query, std::map<std::string
PreparedQuery PrepareProfileQuery(ParsedQuery parsed_query, bool in_explicit_transaction,
std::map<std::string, TypedValue> *summary, InterpreterContext *interpreter_context,
DbAccessor *dba, utils::MemoryResource *execution_memory,
msgs::ShardRequestManagerInterface *shard_request_manager = nullptr) {
ShardRequestManagerInterface *shard_request_manager = nullptr) {
const std::string kProfileQueryStart = "profile ";
MG_ASSERT(utils::StartsWith(utils::ToLowerCase(parsed_query.stripped_query.query()), kProfileQueryStart),
@ -1185,7 +1185,7 @@ PreparedQuery PrepareIndexQuery(ParsedQuery parsed_query, bool in_explicit_trans
PreparedQuery PrepareAuthQuery(ParsedQuery parsed_query, bool in_explicit_transaction,
std::map<std::string, TypedValue> *summary, InterpreterContext *interpreter_context,
DbAccessor *dba, utils::MemoryResource *execution_memory,
msgs::ShardRequestManagerInterface *manager) {
ShardRequestManagerInterface *manager) {
if (in_explicit_transaction) {
throw UserModificationInMulticommandTxException();
}
@ -1221,7 +1221,7 @@ PreparedQuery PrepareAuthQuery(ParsedQuery parsed_query, bool in_explicit_transa
PreparedQuery PrepareReplicationQuery(ParsedQuery parsed_query, const bool in_explicit_transaction,
std::vector<Notification> *notifications, InterpreterContext *interpreter_context,
msgs::ShardRequestManagerInterface *manager) {
ShardRequestManagerInterface *manager) {
if (in_explicit_transaction) {
throw ReplicationModificationInMulticommandTxException();
}
@ -1317,7 +1317,7 @@ PreparedQuery PrepareCreateSnapshotQuery(ParsedQuery parsed_query, bool in_expli
}
PreparedQuery PrepareSettingQuery(ParsedQuery parsed_query, const bool in_explicit_transaction,
msgs::ShardRequestManagerInterface *manager) {
ShardRequestManagerInterface *manager) {
if (in_explicit_transaction) {
throw SettingConfigInMulticommandTxException{};
}

View File

@ -296,7 +296,7 @@ class Interpreter final {
*/
void Abort();
const msgs::ShardRequestManagerInterface *GetShardRequestManager() const { return shard_request_manager_.get(); }
const ShardRequestManagerInterface *GetShardRequestManager() const { return shard_request_manager_.get(); }
private:
struct QueryExecution {
@ -342,7 +342,7 @@ class Interpreter final {
// move this unique_ptr into a shared_ptr.
std::unique_ptr<storage::v3::Shard::Accessor> db_accessor_;
std::optional<DbAccessor> execution_db_accessor_;
std::unique_ptr<msgs::ShardRequestManagerInterface> shard_request_manager_;
std::unique_ptr<ShardRequestManagerInterface> shard_request_manager_;
bool in_explicit_transaction_{false};
bool expect_rollback_{false};

View File

@ -252,7 +252,7 @@ class DistributedCreateNodeCursor : public Cursor {
std::vector<const NodeCreationInfo *> nodes_info_;
std::vector<std::vector<std::pair<storage::v3::PropertyId, msgs::Value>>> src_vertex_props_;
std::vector<msgs::PrimaryKey> primary_keys_;
msgs::ExecutionState<msgs::CreateVerticesRequest> state_;
ExecutionState<msgs::CreateVerticesRequest> state_;
};
bool Once::OnceCursor::Pull(Frame &, ExecutionContext &context) {
@ -365,7 +365,7 @@ class ScanAllCursor : public Cursor {
std::optional<decltype(vertices_.value().begin())> vertices_it_;
const char *op_name_;
std::vector<msgs::ScanVerticesResponse> current_batch;
msgs::ExecutionState<msgs::ScanVerticesRequest> request_state;
ExecutionState<msgs::ScanVerticesRequest> request_state;
};
class DistributedScanAllAndFilterCursor : public Cursor {
@ -386,7 +386,7 @@ class DistributedScanAllAndFilterCursor : public Cursor {
using VertexAccessor = accessors::VertexAccessor;
bool MakeRequest(msgs::ShardRequestManagerInterface &shard_manager, ExecutionContext &context) {
bool MakeRequest(ShardRequestManagerInterface &shard_manager, ExecutionContext &context) {
{
SCOPED_REQUEST_WAIT_PROFILE;
current_batch = shard_manager.Request(request_state_);
@ -403,7 +403,7 @@ class DistributedScanAllAndFilterCursor : public Cursor {
if (MustAbort(context)) {
throw HintedAbortError();
}
using State = msgs::ExecutionState<msgs::ScanVerticesRequest>;
using State = ExecutionState<msgs::ScanVerticesRequest>;
if (request_state_.state == State::INITIALIZING) {
if (!input_cursor_->Pull(frame, context)) {
@ -430,7 +430,7 @@ class DistributedScanAllAndFilterCursor : public Cursor {
void ResetExecutionState() {
current_batch.clear();
current_vertex_it = current_batch.end();
request_state_ = msgs::ExecutionState<msgs::ScanVerticesRequest>{};
request_state_ = ExecutionState<msgs::ScanVerticesRequest>{};
}
void Reset() override {
@ -444,7 +444,7 @@ class DistributedScanAllAndFilterCursor : public Cursor {
const char *op_name_;
std::vector<VertexAccessor> current_batch;
std::vector<VertexAccessor>::iterator current_vertex_it;
msgs::ExecutionState<msgs::ScanVerticesRequest> request_state_;
ExecutionState<msgs::ScanVerticesRequest> request_state_;
std::optional<storage::v3::LabelId> label_;
std::optional<std::pair<storage::v3::PropertyId, Expression *>> property_expression_pair_;
std::optional<std::vector<Expression *>> filter_expressions_;
@ -2426,7 +2426,7 @@ class DistributedCreateExpandCursor : public Cursor {
const UniqueCursorPtr input_cursor_;
const CreateExpand &self_;
msgs::ExecutionState<msgs::CreateExpandRequest> state_;
ExecutionState<msgs::CreateExpandRequest> state_;
};
class DistributedExpandCursor : public Cursor {
@ -2473,7 +2473,7 @@ class DistributedExpandCursor : public Cursor {
request.edge_properties.emplace();
request.src_vertices.push_back(get_dst_vertex(edge, direction));
request.direction = (direction == EdgeAtom::Direction::IN) ? msgs::EdgeDirection::OUT : msgs::EdgeDirection::IN;
msgs::ExecutionState<msgs::ExpandOneRequest> request_state;
ExecutionState<msgs::ExpandOneRequest> request_state;
auto result_rows = context.shard_request_manager->Request(request_state, std::move(request));
MG_ASSERT(result_rows.size() == 1);
auto &result_row = result_rows.front();
@ -2499,7 +2499,7 @@ class DistributedExpandCursor : public Cursor {
// to not fetch any properties of the edges
request.edge_properties.emplace();
request.src_vertices.push_back(vertex.Id());
msgs::ExecutionState<msgs::ExpandOneRequest> request_state;
ExecutionState<msgs::ExpandOneRequest> request_state;
auto result_rows = std::invoke([&context, &request_state, &request]() mutable {
SCOPED_REQUEST_WAIT_PROFILE;
return context.shard_request_manager->Request(request_state, std::move(request));

View File

@ -19,7 +19,7 @@
namespace memgraph::query::v2::plan {
PlanPrinter::PlanPrinter(const msgs::ShardRequestManagerInterface *request_manager, std::ostream *out)
PlanPrinter::PlanPrinter(const ShardRequestManagerInterface *request_manager, std::ostream *out)
: request_manager_(request_manager), out_(out) {}
#define PRE_VISIT(TOp) \
@ -263,14 +263,14 @@ void PlanPrinter::Branch(query::v2::plan::LogicalOperator &op, const std::string
--depth_;
}
void PrettyPrint(const msgs::ShardRequestManagerInterface &request_manager, const LogicalOperator *plan_root,
void PrettyPrint(const ShardRequestManagerInterface &request_manager, const LogicalOperator *plan_root,
std::ostream *out) {
PlanPrinter printer(&request_manager, out);
// FIXME(mtomic): We should make visitors that take const arguments.
const_cast<LogicalOperator *>(plan_root)->Accept(printer);
}
nlohmann::json PlanToJson(const msgs::ShardRequestManagerInterface &request_manager, const LogicalOperator *plan_root) {
nlohmann::json PlanToJson(const ShardRequestManagerInterface &request_manager, const LogicalOperator *plan_root) {
impl::PlanToJsonVisitor visitor(&request_manager);
// FIXME(mtomic): We should make visitors that take const arguments.
const_cast<LogicalOperator *>(plan_root)->Accept(visitor);
@ -349,15 +349,15 @@ json ToJson(const utils::Bound<Expression *> &bound) {
json ToJson(const Symbol &symbol) { return symbol.name(); }
json ToJson(storage::v3::EdgeTypeId edge_type, const msgs::ShardRequestManagerInterface &request_manager) {
json ToJson(storage::v3::EdgeTypeId edge_type, const ShardRequestManagerInterface &request_manager) {
return request_manager.EdgeTypeToName(edge_type);
}
json ToJson(storage::v3::LabelId label, const msgs::ShardRequestManagerInterface &request_manager) {
json ToJson(storage::v3::LabelId label, const ShardRequestManagerInterface &request_manager) {
return request_manager.LabelToName(label);
}
json ToJson(storage::v3::PropertyId property, const msgs::ShardRequestManagerInterface &request_manager) {
json ToJson(storage::v3::PropertyId property, const ShardRequestManagerInterface &request_manager) {
return request_manager.PropertyToName(property);
}
@ -369,7 +369,7 @@ json ToJson(NamedExpression *nexpr) {
}
json ToJson(const std::vector<std::pair<storage::v3::PropertyId, Expression *>> &properties,
const msgs::ShardRequestManagerInterface &request_manager) {
const ShardRequestManagerInterface &request_manager) {
json json;
for (const auto &prop_pair : properties) {
json.emplace(ToJson(prop_pair.first, request_manager), ToJson(prop_pair.second));
@ -377,7 +377,7 @@ json ToJson(const std::vector<std::pair<storage::v3::PropertyId, Expression *>>
return json;
}
json ToJson(const NodeCreationInfo &node_info, const msgs::ShardRequestManagerInterface &request_manager) {
json ToJson(const NodeCreationInfo &node_info, const ShardRequestManagerInterface &request_manager) {
json self;
self["symbol"] = ToJson(node_info.symbol);
self["labels"] = ToJson(node_info.labels, request_manager);
@ -386,7 +386,7 @@ json ToJson(const NodeCreationInfo &node_info, const msgs::ShardRequestManagerIn
return self;
}
json ToJson(const EdgeCreationInfo &edge_info, const msgs::ShardRequestManagerInterface &request_manager) {
json ToJson(const EdgeCreationInfo &edge_info, const ShardRequestManagerInterface &request_manager) {
json self;
self["symbol"] = ToJson(edge_info.symbol);
const auto *props = std::get_if<PropertiesMapList>(&edge_info.properties);

View File

@ -30,17 +30,17 @@ class LogicalOperator;
/// ShardRequestManager is needed for resolving label and property names.
/// Note that `plan_root` isn't modified, but we can't take it as a const
/// because we don't have support for visiting a const LogicalOperator.
void PrettyPrint(const msgs::ShardRequestManagerInterface &request_manager, const LogicalOperator *plan_root,
void PrettyPrint(const ShardRequestManagerInterface &request_manager, const LogicalOperator *plan_root,
std::ostream *out);
/// Overload of `PrettyPrint` which defaults the `std::ostream` to `std::cout`.
inline void PrettyPrint(const msgs::ShardRequestManagerInterface &request_manager, const LogicalOperator *plan_root) {
inline void PrettyPrint(const ShardRequestManagerInterface &request_manager, const LogicalOperator *plan_root) {
PrettyPrint(request_manager, plan_root, &std::cout);
}
/// Convert a `LogicalOperator` plan to a JSON representation.
/// DbAccessor is needed for resolving label and property names.
nlohmann::json PlanToJson(const msgs::ShardRequestManagerInterface &request_manager, const LogicalOperator *plan_root);
nlohmann::json PlanToJson(const ShardRequestManagerInterface &request_manager, const LogicalOperator *plan_root);
class PlanPrinter : public virtual HierarchicalLogicalOperatorVisitor {
public:
@ -48,7 +48,7 @@ class PlanPrinter : public virtual HierarchicalLogicalOperatorVisitor {
using HierarchicalLogicalOperatorVisitor::PreVisit;
using HierarchicalLogicalOperatorVisitor::Visit;
PlanPrinter(const msgs::ShardRequestManagerInterface *request_manager, std::ostream *out);
PlanPrinter(const ShardRequestManagerInterface *request_manager, std::ostream *out);
bool DefaultPreVisit() override;
@ -115,7 +115,7 @@ class PlanPrinter : public virtual HierarchicalLogicalOperatorVisitor {
void Branch(LogicalOperator &op, const std::string &branch_name = "");
int64_t depth_{0};
const msgs::ShardRequestManagerInterface *request_manager_{nullptr};
const ShardRequestManagerInterface *request_manager_{nullptr};
std::ostream *out_{nullptr};
};
@ -133,20 +133,20 @@ nlohmann::json ToJson(const utils::Bound<Expression *> &bound);
nlohmann::json ToJson(const Symbol &symbol);
nlohmann::json ToJson(storage::v3::EdgeTypeId edge_type, const msgs::ShardRequestManagerInterface &request_manager);
nlohmann::json ToJson(storage::v3::EdgeTypeId edge_type, const ShardRequestManagerInterface &request_manager);
nlohmann::json ToJson(storage::v3::LabelId label, const msgs::ShardRequestManagerInterface &request_manager);
nlohmann::json ToJson(storage::v3::LabelId label, const ShardRequestManagerInterface &request_manager);
nlohmann::json ToJson(storage::v3::PropertyId property, const msgs::ShardRequestManagerInterface &request_manager);
nlohmann::json ToJson(storage::v3::PropertyId property, const ShardRequestManagerInterface &request_manager);
nlohmann::json ToJson(NamedExpression *nexpr);
nlohmann::json ToJson(const std::vector<std::pair<storage::v3::PropertyId, Expression *>> &properties,
const msgs::ShardRequestManagerInterface &request_manager);
const ShardRequestManagerInterface &request_manager);
nlohmann::json ToJson(const NodeCreationInfo &node_info, const msgs::ShardRequestManagerInterface &request_manager);
nlohmann::json ToJson(const NodeCreationInfo &node_info, const ShardRequestManagerInterface &request_manager);
nlohmann::json ToJson(const EdgeCreationInfo &edge_info, const msgs::ShardRequestManagerInterface &request_manager);
nlohmann::json ToJson(const EdgeCreationInfo &edge_info, const ShardRequestManagerInterface &request_manager);
nlohmann::json ToJson(const Aggregate::Element &elem);
@ -161,8 +161,7 @@ nlohmann::json ToJson(const std::vector<T> &items, Args &&...args) {
class PlanToJsonVisitor : public virtual HierarchicalLogicalOperatorVisitor {
public:
explicit PlanToJsonVisitor(const msgs::ShardRequestManagerInterface *request_manager)
: request_manager_(request_manager) {}
explicit PlanToJsonVisitor(const ShardRequestManagerInterface *request_manager) : request_manager_(request_manager) {}
using HierarchicalLogicalOperatorVisitor::PostVisit;
using HierarchicalLogicalOperatorVisitor::PreVisit;
@ -218,7 +217,7 @@ class PlanToJsonVisitor : public virtual HierarchicalLogicalOperatorVisitor {
protected:
nlohmann::json output_;
const msgs::ShardRequestManagerInterface *request_manager_;
const ShardRequestManagerInterface *request_manager_;
nlohmann::json PopOutput() {
nlohmann::json tmp;

View File

@ -57,7 +57,7 @@ class VertexCountCache {
bool LabelPropertyIndexExists(storage::v3::LabelId /*label*/, storage::v3::PropertyId /*property*/) { return false; }
msgs::ShardRequestManagerInterface *shard_request_manager_;
ShardRequestManagerInterface *shard_request_manager_;
};
template <class TDbAccessor>

View File

@ -42,13 +42,12 @@
#include "storage/v3/value_conversions.hpp"
#include "utils/result.hpp"
namespace memgraph::msgs {
namespace memgraph::query::v2 {
template <typename TStorageClient>
class RsmStorageClientManager {
public:
using CompoundKey = memgraph::io::rsm::ShardRsmKey;
using Shard = memgraph::coordinator::Shard;
using LabelId = memgraph::storage::v3::LabelId;
using CompoundKey = io::rsm::ShardRsmKey;
using Shard = coordinator::Shard;
RsmStorageClientManager() = default;
RsmStorageClientManager(const RsmStorageClientManager &) = delete;
RsmStorageClientManager(RsmStorageClientManager &&) = delete;
@ -74,8 +73,8 @@ class RsmStorageClientManager {
template <typename TRequest>
struct ExecutionState {
using CompoundKey = memgraph::io::rsm::ShardRsmKey;
using Shard = memgraph::coordinator::Shard;
using CompoundKey = io::rsm::ShardRsmKey;
using Shard = coordinator::Shard;
enum State : int8_t { INITIALIZING, EXECUTING, COMPLETED };
// label is optional because some operators can create/remove etc, vertices. These kind of requests contain the label
@ -85,7 +84,7 @@ struct ExecutionState {
// of a shard. One example is ScanAll, where we only require the field label.
std::optional<CompoundKey> key;
// Transaction id to be filled by the ShardRequestManager implementation
memgraph::coordinator::Hlc transaction_id;
coordinator::Hlc transaction_id;
// Initialized by ShardRequestManager implementation. This vector is filled with the shards that
// the ShardRequestManager impl will send requests to. When a request to a shard exhausts it, meaning that
// it pulled all the requested data from the given Shard, it will be removed from the Vector. When the Vector becomes
@ -104,7 +103,7 @@ struct ExecutionState {
class ShardRequestManagerInterface {
public:
using VertexAccessor = memgraph::query::v2::accessors::VertexAccessor;
using VertexAccessor = query::v2::accessors::VertexAccessor;
ShardRequestManagerInterface() = default;
ShardRequestManagerInterface(const ShardRequestManagerInterface &) = delete;
ShardRequestManagerInterface(ShardRequestManagerInterface &&) = delete;
@ -115,38 +114,38 @@ class ShardRequestManagerInterface {
virtual void StartTransaction() = 0;
virtual void Commit() = 0;
virtual std::vector<VertexAccessor> Request(ExecutionState<ScanVerticesRequest> &state) = 0;
virtual std::vector<CreateVerticesResponse> Request(ExecutionState<CreateVerticesRequest> &state,
std::vector<NewVertex> new_vertices) = 0;
virtual std::vector<ExpandOneResultRow> Request(ExecutionState<ExpandOneRequest> &state,
ExpandOneRequest request) = 0;
virtual std::vector<CreateExpandResponse> Request(ExecutionState<CreateExpandRequest> &state,
std::vector<NewExpand> new_edges) = 0;
virtual std::vector<VertexAccessor> Request(ExecutionState<msgs::ScanVerticesRequest> &state) = 0;
virtual std::vector<msgs::CreateVerticesResponse> Request(ExecutionState<msgs::CreateVerticesRequest> &state,
std::vector<msgs::NewVertex> new_vertices) = 0;
virtual std::vector<msgs::ExpandOneResultRow> Request(ExecutionState<msgs::ExpandOneRequest> &state,
msgs::ExpandOneRequest request) = 0;
virtual std::vector<msgs::CreateExpandResponse> Request(ExecutionState<msgs::CreateExpandRequest> &state,
std::vector<msgs::NewExpand> new_edges) = 0;
virtual storage::v3::EdgeTypeId NameToEdgeType(const std::string &name) const = 0;
virtual storage::v3::PropertyId NameToProperty(const std::string &name) const = 0;
virtual storage::v3::LabelId NameToLabel(const std::string &name) const = 0;
virtual const std::string &PropertyToName(memgraph::storage::v3::PropertyId prop) const = 0;
virtual const std::string &LabelToName(memgraph::storage::v3::LabelId label) const = 0;
virtual const std::string &EdgeTypeToName(memgraph::storage::v3::EdgeTypeId type) const = 0;
virtual bool IsPrimaryLabel(LabelId label) const = 0;
virtual bool IsPrimaryKey(LabelId primary_label, PropertyId property) const = 0;
virtual const std::string &PropertyToName(storage::v3::PropertyId prop) const = 0;
virtual const std::string &LabelToName(storage::v3::LabelId label) const = 0;
virtual const std::string &EdgeTypeToName(storage::v3::EdgeTypeId type) const = 0;
virtual bool IsPrimaryLabel(storage::v3::LabelId label) const = 0;
virtual bool IsPrimaryKey(storage::v3::LabelId primary_label, storage::v3::PropertyId property) const = 0;
};
// TODO(kostasrim)rename this class template
template <typename TTransport>
class ShardRequestManager : public ShardRequestManagerInterface {
public:
using StorageClient =
memgraph::coordinator::RsmClient<TTransport, WriteRequests, WriteResponses, ReadRequests, ReadResponses>;
using CoordinatorWriteRequests = memgraph::coordinator::CoordinatorWriteRequests;
using CoordinatorClient = memgraph::coordinator::CoordinatorClient<TTransport>;
using Address = memgraph::io::Address;
using Shard = memgraph::coordinator::Shard;
using ShardMap = memgraph::coordinator::ShardMap;
using CompoundKey = memgraph::coordinator::PrimaryKey;
using VertexAccessor = memgraph::query::v2::accessors::VertexAccessor;
ShardRequestManager(CoordinatorClient coord, memgraph::io::Io<TTransport> &&io)
using StorageClient = coordinator::RsmClient<TTransport, msgs::WriteRequests, msgs::WriteResponses,
msgs::ReadRequests, msgs::ReadResponses>;
using CoordinatorWriteRequests = coordinator::CoordinatorWriteRequests;
using CoordinatorClient = coordinator::CoordinatorClient<TTransport>;
using Address = io::Address;
using Shard = coordinator::Shard;
using ShardMap = coordinator::ShardMap;
using CompoundKey = coordinator::PrimaryKey;
using VertexAccessor = query::v2::accessors::VertexAccessor;
ShardRequestManager(CoordinatorClient coord, io::Io<TTransport> &&io)
: coord_cli_(std::move(coord)), io_(std::move(io)) {}
ShardRequestManager(const ShardRequestManager &) = delete;
@ -157,14 +156,14 @@ class ShardRequestManager : public ShardRequestManagerInterface {
~ShardRequestManager() override {}
void StartTransaction() override {
memgraph::coordinator::HlcRequest req{.last_shard_map_version = shards_map_.GetHlc()};
coordinator::HlcRequest req{.last_shard_map_version = shards_map_.GetHlc()};
CoordinatorWriteRequests write_req = req;
auto write_res = coord_cli_.SendWriteRequest(write_req);
if (write_res.HasError()) {
throw std::runtime_error("HLC request failed");
}
auto coordinator_write_response = write_res.GetValue();
auto hlc_response = std::get<memgraph::coordinator::HlcResponse>(coordinator_write_response);
auto hlc_response = std::get<coordinator::HlcResponse>(coordinator_write_response);
// Transaction ID to be used later...
transaction_id_ = hlc_response.new_hlc;
@ -176,14 +175,14 @@ class ShardRequestManager : public ShardRequestManagerInterface {
}
void Commit() override {
memgraph::coordinator::HlcRequest req{.last_shard_map_version = shards_map_.GetHlc()};
coordinator::HlcRequest req{.last_shard_map_version = shards_map_.GetHlc()};
CoordinatorWriteRequests write_req = req;
auto write_res = coord_cli_.SendWriteRequest(write_req);
if (write_res.HasError()) {
throw std::runtime_error("HLC request for commit failed");
}
auto coordinator_write_response = write_res.GetValue();
auto hlc_response = std::get<memgraph::coordinator::HlcResponse>(coordinator_write_response);
auto hlc_response = std::get<coordinator::HlcResponse>(coordinator_write_response);
if (hlc_response.fresher_shard_map) {
shards_map_ = hlc_response.fresher_shard_map.value();
@ -204,8 +203,8 @@ class ShardRequestManager : public ShardRequestManagerInterface {
if (commit_response.HasError()) {
throw std::runtime_error("Commit request timed out");
}
WriteResponses write_response_variant = commit_response.GetValue();
auto &response = std::get<CommitResponse>(write_response_variant);
msgs::WriteResponses write_response_variant = commit_response.GetValue();
auto &response = std::get<msgs::CommitResponse>(write_response_variant);
if (response.error) {
throw std::runtime_error("Commit request did not succeed");
}
@ -225,17 +224,15 @@ class ShardRequestManager : public ShardRequestManagerInterface {
return shards_map_.GetLabelId(name).value();
}
const std::string &PropertyToName(memgraph::storage::v3::PropertyId id) const override {
const std::string &PropertyToName(storage::v3::PropertyId id) const override {
return properties_.IdToName(id.AsUint());
}
const std::string &LabelToName(memgraph::storage::v3::LabelId id) const override {
return labels_.IdToName(id.AsUint());
}
const std::string &EdgeTypeToName(memgraph::storage::v3::EdgeTypeId id) const override {
const std::string &LabelToName(storage::v3::LabelId id) const override { return labels_.IdToName(id.AsUint()); }
const std::string &EdgeTypeToName(storage::v3::EdgeTypeId id) const override {
return edge_types_.IdToName(id.AsUint());
}
bool IsPrimaryKey(LabelId primary_label, PropertyId property) const override {
bool IsPrimaryKey(storage::v3::LabelId primary_label, storage::v3::PropertyId property) const override {
const auto schema_it = shards_map_.schemas.find(primary_label);
MG_ASSERT(schema_it != shards_map_.schemas.end(), "Invalid primary label id: {}", primary_label.AsUint());
@ -244,12 +241,12 @@ class ShardRequestManager : public ShardRequestManagerInterface {
}) != schema_it->second.end();
}
bool IsPrimaryLabel(LabelId label) const override { return shards_map_.label_spaces.contains(label); }
bool IsPrimaryLabel(storage::v3::LabelId label) const override { return shards_map_.label_spaces.contains(label); }
// TODO(kostasrim) Simplify return result
std::vector<VertexAccessor> Request(ExecutionState<ScanVerticesRequest> &state) override {
std::vector<VertexAccessor> Request(ExecutionState<msgs::ScanVerticesRequest> &state) override {
MaybeInitializeExecutionState(state);
std::vector<ScanVerticesResponse> responses;
std::vector<msgs::ScanVerticesResponse> responses;
SendAllRequests(state);
auto all_requests_gathered = [](auto &paginated_rsp_tracker) {
@ -273,11 +270,11 @@ class ShardRequestManager : public ShardRequestManagerInterface {
return PostProcess(std::move(responses));
}
std::vector<CreateVerticesResponse> Request(ExecutionState<CreateVerticesRequest> &state,
std::vector<NewVertex> new_vertices) override {
std::vector<msgs::CreateVerticesResponse> Request(ExecutionState<msgs::CreateVerticesRequest> &state,
std::vector<msgs::NewVertex> new_vertices) override {
MG_ASSERT(!new_vertices.empty());
MaybeInitializeExecutionState(state, new_vertices);
std::vector<CreateVerticesResponse> responses;
std::vector<msgs::CreateVerticesResponse> responses;
auto &shard_cache_ref = state.shard_cache;
// 1. Send the requests.
@ -294,22 +291,22 @@ class ShardRequestManager : public ShardRequestManagerInterface {
return responses;
}
std::vector<CreateExpandResponse> Request(ExecutionState<CreateExpandRequest> &state,
std::vector<NewExpand> new_edges) override {
std::vector<msgs::CreateExpandResponse> Request(ExecutionState<msgs::CreateExpandRequest> &state,
std::vector<msgs::NewExpand> new_edges) override {
MG_ASSERT(!new_edges.empty());
MaybeInitializeExecutionState(state, new_edges);
std::vector<CreateExpandResponse> responses;
std::vector<msgs::CreateExpandResponse> responses;
auto &shard_cache_ref = state.shard_cache;
size_t id{0};
for (auto shard_it = shard_cache_ref.begin(); shard_it != shard_cache_ref.end(); ++id) {
auto &storage_client = GetStorageClientForShard(*shard_it);
WriteRequests req = state.requests[id];
msgs::WriteRequests req = state.requests[id];
auto write_response_result = storage_client.SendWriteRequest(std::move(req));
if (write_response_result.HasError()) {
throw std::runtime_error("CreateVertices request timedout");
}
WriteResponses response_variant = write_response_result.GetValue();
CreateExpandResponse mapped_response = std::get<CreateExpandResponse>(response_variant);
msgs::WriteResponses response_variant = write_response_result.GetValue();
msgs::CreateExpandResponse mapped_response = std::get<msgs::CreateExpandResponse>(response_variant);
if (mapped_response.error) {
throw std::runtime_error("CreateExpand request did not succeed");
@ -322,14 +319,15 @@ class ShardRequestManager : public ShardRequestManagerInterface {
return responses;
}
std::vector<ExpandOneResultRow> Request(ExecutionState<ExpandOneRequest> &state, ExpandOneRequest request) override {
std::vector<msgs::ExpandOneResultRow> Request(ExecutionState<msgs::ExpandOneRequest> &state,
msgs::ExpandOneRequest request) override {
// TODO(kostasrim)Update to limit the batch size here
// Expansions of the destination must be handled by the caller. For example
// match (u:L1 { prop : 1 })-[:Friend]-(v:L1)
// For each vertex U, the ExpandOne will result in <U, Edges>. The destination vertex and its properties
// must be fetched again with an ExpandOne(Edges.dst)
MaybeInitializeExecutionState(state, std::move(request));
std::vector<ExpandOneResponse> responses;
std::vector<msgs::ExpandOneResponse> responses;
auto &shard_cache_ref = state.shard_cache;
// 1. Send the requests.
@ -339,10 +337,11 @@ class ShardRequestManager : public ShardRequestManagerInterface {
do {
AwaitOnResponses(state, responses);
} while (!state.shard_cache.empty());
std::vector<ExpandOneResultRow> result_rows;
const auto total_row_count = std::accumulate(
responses.begin(), responses.end(), 0,
[](const int64_t partial_count, const ExpandOneResponse &resp) { return partial_count + resp.result.size(); });
std::vector<msgs::ExpandOneResultRow> result_rows;
const auto total_row_count = std::accumulate(responses.begin(), responses.end(), 0,
[](const int64_t partial_count, const msgs::ExpandOneResponse &resp) {
return partial_count + resp.result.size();
});
result_rows.reserve(total_row_count);
for (auto &response : responses) {
@ -356,7 +355,7 @@ class ShardRequestManager : public ShardRequestManagerInterface {
private:
enum class PaginatedResponseState { Pending, PartiallyFinished };
std::vector<VertexAccessor> PostProcess(std::vector<ScanVerticesResponse> &&responses) const {
std::vector<VertexAccessor> PostProcess(std::vector<msgs::ScanVerticesResponse> &&responses) const {
std::vector<VertexAccessor> accessors;
for (auto &response : responses) {
for (auto &result_row : response.results) {
@ -385,22 +384,22 @@ class ShardRequestManager : public ShardRequestManagerInterface {
return state.state != ExecutionState::INITIALIZING;
}
void MaybeInitializeExecutionState(ExecutionState<CreateVerticesRequest> &state,
std::vector<NewVertex> new_vertices) {
void MaybeInitializeExecutionState(ExecutionState<msgs::CreateVerticesRequest> &state,
std::vector<msgs::NewVertex> new_vertices) {
ThrowIfStateCompleted(state);
if (ShallNotInitializeState(state)) {
return;
}
state.transaction_id = transaction_id_;
std::map<Shard, CreateVerticesRequest> per_shard_request_table;
std::map<Shard, msgs::CreateVerticesRequest> per_shard_request_table;
for (auto &new_vertex : new_vertices) {
MG_ASSERT(!new_vertex.label_ids.empty(), "This is error!");
auto shard = shards_map_.GetShardForKey(new_vertex.label_ids[0].id,
storage::conversions::ConvertPropertyVector(new_vertex.primary_key));
if (!per_shard_request_table.contains(shard)) {
CreateVerticesRequest create_v_rqst{.transaction_id = transaction_id_};
msgs::CreateVerticesRequest create_v_rqst{.transaction_id = transaction_id_};
per_shard_request_table.insert(std::pair(shard, std::move(create_v_rqst)));
state.shard_cache.push_back(shard);
}
@ -410,21 +409,22 @@ class ShardRequestManager : public ShardRequestManagerInterface {
for (auto &[shard, rqst] : per_shard_request_table) {
state.requests.push_back(std::move(rqst));
}
state.state = ExecutionState<CreateVerticesRequest>::EXECUTING;
state.state = ExecutionState<msgs::CreateVerticesRequest>::EXECUTING;
}
void MaybeInitializeExecutionState(ExecutionState<CreateExpandRequest> &state, std::vector<NewExpand> new_expands) {
void MaybeInitializeExecutionState(ExecutionState<msgs::CreateExpandRequest> &state,
std::vector<msgs::NewExpand> new_expands) {
ThrowIfStateCompleted(state);
if (ShallNotInitializeState(state)) {
return;
}
state.transaction_id = transaction_id_;
std::map<Shard, CreateExpandRequest> per_shard_request_table;
std::map<Shard, msgs::CreateExpandRequest> per_shard_request_table;
auto ensure_shard_exists_in_table = [&per_shard_request_table,
transaction_id = transaction_id_](const Shard &shard) {
if (!per_shard_request_table.contains(shard)) {
CreateExpandRequest create_expand_request{.transaction_id = transaction_id};
msgs::CreateExpandRequest create_expand_request{.transaction_id = transaction_id};
per_shard_request_table.insert({shard, std::move(create_expand_request)});
}
};
@ -448,10 +448,10 @@ class ShardRequestManager : public ShardRequestManagerInterface {
state.shard_cache.push_back(shard);
state.requests.push_back(std::move(request));
}
state.state = ExecutionState<CreateExpandRequest>::EXECUTING;
state.state = ExecutionState<msgs::CreateExpandRequest>::EXECUTING;
}
void MaybeInitializeExecutionState(ExecutionState<ScanVerticesRequest> &state) {
void MaybeInitializeExecutionState(ExecutionState<msgs::ScanVerticesRequest> &state) {
ThrowIfStateCompleted(state);
if (ShallNotInitializeState(state)) {
return;
@ -471,23 +471,23 @@ class ShardRequestManager : public ShardRequestManagerInterface {
for (auto &[key, shard] : shards) {
MG_ASSERT(!shard.empty());
state.shard_cache.push_back(std::move(shard));
ScanVerticesRequest rqst;
msgs::ScanVerticesRequest rqst;
rqst.transaction_id = transaction_id_;
rqst.start_id.second = storage::conversions::ConvertValueVector(key);
state.requests.push_back(std::move(rqst));
}
}
state.state = ExecutionState<ScanVerticesRequest>::EXECUTING;
state.state = ExecutionState<msgs::ScanVerticesRequest>::EXECUTING;
}
void MaybeInitializeExecutionState(ExecutionState<ExpandOneRequest> &state, ExpandOneRequest request) {
void MaybeInitializeExecutionState(ExecutionState<msgs::ExpandOneRequest> &state, msgs::ExpandOneRequest request) {
ThrowIfStateCompleted(state);
if (ShallNotInitializeState(state)) {
return;
}
state.transaction_id = transaction_id_;
std::map<Shard, ExpandOneRequest> per_shard_request_table;
std::map<Shard, msgs::ExpandOneRequest> per_shard_request_table;
auto top_level_rqst_template = request;
top_level_rqst_template.transaction_id = transaction_id_;
top_level_rqst_template.src_vertices.clear();
@ -505,7 +505,7 @@ class ShardRequestManager : public ShardRequestManagerInterface {
for (auto &[shard, rqst] : per_shard_request_table) {
state.requests.push_back(std::move(rqst));
}
state.state = ExecutionState<ExpandOneRequest>::EXECUTING;
state.state = ExecutionState<msgs::ExpandOneRequest>::EXECUTING;
}
StorageClient &GetStorageClientForShard(Shard shard) {
@ -532,20 +532,20 @@ class ShardRequestManager : public ShardRequestManagerInterface {
storage_cli_manager_.AddClient(target_shard, std::move(cli));
}
void SendAllRequests(ExecutionState<ScanVerticesRequest> &state) {
void SendAllRequests(ExecutionState<msgs::ScanVerticesRequest> &state) {
int64_t shard_idx = 0;
for (const auto &request : state.requests) {
const auto &current_shard = state.shard_cache[shard_idx];
auto &storage_client = GetStorageClientForShard(current_shard);
ReadRequests req = request;
msgs::ReadRequests req = request;
storage_client.SendAsyncReadRequest(request);
++shard_idx;
}
}
void SendAllRequests(ExecutionState<CreateVerticesRequest> &state,
void SendAllRequests(ExecutionState<msgs::CreateVerticesRequest> &state,
std::vector<memgraph::coordinator::Shard> &shard_cache_ref) {
size_t id = 0;
for (auto shard_it = shard_cache_ref.begin(); shard_it != shard_cache_ref.end(); ++shard_it) {
@ -559,24 +559,25 @@ class ShardRequestManager : public ShardRequestManagerInterface {
auto &storage_client = GetStorageClientForShard(*shard_it);
WriteRequests req = req_deep_copy;
msgs::WriteRequests req = req_deep_copy;
storage_client.SendAsyncWriteRequest(req);
++id;
}
}
void SendAllRequests(ExecutionState<ExpandOneRequest> &state,
void SendAllRequests(ExecutionState<msgs::ExpandOneRequest> &state,
std::vector<memgraph::coordinator::Shard> &shard_cache_ref) {
size_t id = 0;
for (auto shard_it = shard_cache_ref.begin(); shard_it != shard_cache_ref.end(); ++shard_it) {
auto &storage_client = GetStorageClientForShard(*shard_it);
ReadRequests req = state.requests[id];
msgs::ReadRequests req = state.requests[id];
storage_client.SendAsyncReadRequest(req);
++id;
}
}
void AwaitOnResponses(ExecutionState<CreateVerticesRequest> &state, std::vector<CreateVerticesResponse> &responses) {
void AwaitOnResponses(ExecutionState<msgs::CreateVerticesRequest> &state,
std::vector<msgs::CreateVerticesResponse> &responses) {
auto &shard_cache_ref = state.shard_cache;
int64_t request_idx = 0;
@ -598,8 +599,8 @@ class ShardRequestManager : public ShardRequestManagerInterface {
throw std::runtime_error("CreateVertices request timed out");
}
WriteResponses response_variant = poll_result->GetValue();
auto response = std::get<CreateVerticesResponse>(response_variant);
msgs::WriteResponses response_variant = poll_result->GetValue();
auto response = std::get<msgs::CreateVerticesResponse>(response_variant);
if (response.error) {
throw std::runtime_error("CreateVertices request did not succeed");
@ -613,7 +614,8 @@ class ShardRequestManager : public ShardRequestManagerInterface {
}
}
void AwaitOnResponses(ExecutionState<ExpandOneRequest> &state, std::vector<ExpandOneResponse> &responses) {
void AwaitOnResponses(ExecutionState<msgs::ExpandOneRequest> &state,
std::vector<msgs::ExpandOneResponse> &responses) {
auto &shard_cache_ref = state.shard_cache;
int64_t request_idx = 0;
@ -631,8 +633,8 @@ class ShardRequestManager : public ShardRequestManagerInterface {
throw std::runtime_error("ExpandOne request timed out");
}
ReadResponses response_variant = poll_result->GetValue();
auto response = std::get<ExpandOneResponse>(response_variant);
msgs::ReadResponses response_variant = poll_result->GetValue();
auto response = std::get<msgs::ExpandOneResponse>(response_variant);
// -NOTE-
// Currently a boolean flag for signaling the overall success of the
// ExpandOne request does not exist. But it should, so here we assume
@ -649,8 +651,8 @@ class ShardRequestManager : public ShardRequestManagerInterface {
}
}
void AwaitOnPaginatedRequests(ExecutionState<ScanVerticesRequest> &state,
std::vector<ScanVerticesResponse> &responses,
void AwaitOnPaginatedRequests(ExecutionState<msgs::ScanVerticesRequest> &state,
std::vector<msgs::ScanVerticesResponse> &responses,
std::map<Shard, PaginatedResponseState> &paginated_response_tracker) {
auto &shard_cache_ref = state.shard_cache;
@ -678,8 +680,8 @@ class ShardRequestManager : public ShardRequestManagerInterface {
throw std::runtime_error("ScanAll request timed out");
}
ReadResponses read_response_variant = await_result->GetValue();
auto response = std::get<ScanVerticesResponse>(read_response_variant);
msgs::ReadResponses read_response_variant = await_result->GetValue();
auto response = std::get<msgs::ScanVerticesResponse>(read_response_variant);
if (response.error) {
throw std::runtime_error("ScanAll request did not succeed");
}
@ -723,8 +725,8 @@ class ShardRequestManager : public ShardRequestManagerInterface {
storage::v3::NameIdMapper labels_;
CoordinatorClient coord_cli_;
RsmStorageClientManager<StorageClient> storage_cli_manager_;
memgraph::io::Io<TTransport> io_;
memgraph::coordinator::Hlc transaction_id_;
io::Io<TTransport> io_;
coordinator::Hlc transaction_id_;
// TODO(kostasrim) Add batch prefetching
};
} // namespace memgraph::msgs
} // namespace memgraph::query::v2

View File

@ -151,7 +151,7 @@ void RunStorageRaft(Raft<IoImpl, MockedShardRsm, WriteRequests, WriteResponses,
server.Run();
}
void TestScanVertices(msgs::ShardRequestManagerInterface &io) {
void TestScanVertices(query::v2::ShardRequestManagerInterface &io) {
msgs::ExecutionState<ScanVerticesRequest> state{.label = "test_label"};
auto result = io.Request(state);
@ -171,7 +171,7 @@ void TestScanVertices(msgs::ShardRequestManagerInterface &io) {
}
}
void TestCreateVertices(msgs::ShardRequestManagerInterface &io) {
void TestCreateVertices(query::v2::ShardRequestManagerInterface &io) {
using PropVal = msgs::Value;
msgs::ExecutionState<CreateVerticesRequest> state;
std::vector<msgs::NewVertex> new_vertices;
@ -187,7 +187,7 @@ void TestCreateVertices(msgs::ShardRequestManagerInterface &io) {
MG_ASSERT(result.size() == 2);
}
void TestCreateExpand(msgs::ShardRequestManagerInterface &io) {
void TestCreateExpand(query::v2::ShardRequestManagerInterface &io) {
using PropVal = msgs::Value;
msgs::ExecutionState<msgs::CreateExpandRequest> state;
std::vector<msgs::NewExpand> new_expands;
@ -209,7 +209,7 @@ void TestCreateExpand(msgs::ShardRequestManagerInterface &io) {
MG_ASSERT(responses[1].success);
}
void TestExpandOne(msgs::ShardRequestManagerInterface &shard_request_manager) {
void TestExpandOne(query::v2::ShardRequestManagerInterface &shard_request_manager) {
msgs::ExecutionState<msgs::ExpandOneRequest> state{};
msgs::ExpandOneRequest request;
const auto edge_type_id = shard_request_manager.NameToEdgeType("edge_type");
@ -337,7 +337,7 @@ void DoTest() {
// also get the current shard map
CoordinatorClient<SimulatorTransport> coordinator_client(cli_io, c_addrs[0], c_addrs);
msgs::ShardRequestManager<SimulatorTransport> io(std::move(coordinator_client), std::move(cli_io));
query::v2::ShardRequestManager<SimulatorTransport> io(std::move(coordinator_client), std::move(cli_io));
io.StartTransaction();
TestScanVertices(io);

View File

@ -151,7 +151,7 @@ ShardMap TestShardMap(int n_splits, int replication_factor) {
return sm;
}
void ExecuteOp(msgs::ShardRequestManager<SimulatorTransport> &shard_request_manager,
void ExecuteOp(query::v2::ShardRequestManager<SimulatorTransport> &shard_request_manager,
std::set<CompoundKey> &correctness_model, CreateVertex create_vertex) {
const auto key1 = memgraph::storage::v3::PropertyValue(create_vertex.first);
const auto key2 = memgraph::storage::v3::PropertyValue(create_vertex.second);
@ -164,7 +164,7 @@ void ExecuteOp(msgs::ShardRequestManager<SimulatorTransport> &shard_request_mana
return;
}
msgs::ExecutionState<msgs::CreateVerticesRequest> state;
query::v2::ExecutionState<msgs::CreateVerticesRequest> state;
auto label_id = shard_request_manager.NameToLabel("test_label");
@ -182,9 +182,9 @@ void ExecuteOp(msgs::ShardRequestManager<SimulatorTransport> &shard_request_mana
correctness_model.emplace(std::make_pair(create_vertex.first, create_vertex.second));
}
void ExecuteOp(msgs::ShardRequestManager<SimulatorTransport> &shard_request_manager,
void ExecuteOp(query::v2::ShardRequestManager<SimulatorTransport> &shard_request_manager,
std::set<CompoundKey> &correctness_model, ScanAll scan_all) {
msgs::ExecutionState<msgs::ScanVerticesRequest> request{.label = "test_label"};
query::v2::ExecutionState<msgs::ScanVerticesRequest> request{.label = "test_label"};
auto results = shard_request_manager.Request(request);
@ -247,7 +247,8 @@ std::pair<SimulatorStats, LatencyHistogramSummaries> RunClusterSimulation(const
CoordinatorClient<SimulatorTransport> coordinator_client(cli_io, coordinator_address, {coordinator_address});
WaitForShardsToInitialize(coordinator_client);
msgs::ShardRequestManager<SimulatorTransport> shard_request_manager(std::move(coordinator_client), std::move(cli_io));
query::v2::ShardRequestManager<SimulatorTransport> shard_request_manager(std::move(coordinator_client),
std::move(cli_io));
shard_request_manager.StartTransaction();

View File

@ -161,7 +161,7 @@ ShardMap TestShardMap(int shards, int replication_factor, int gap_between_shards
return sm;
}
void ExecuteOp(msgs::ShardRequestManager<LocalTransport> &shard_request_manager,
void ExecuteOp(query::v2::ShardRequestManager<LocalTransport> &shard_request_manager,
std::set<CompoundKey> &correctness_model, CreateVertex create_vertex) {
const auto key1 = memgraph::storage::v3::PropertyValue(create_vertex.first);
const auto key2 = memgraph::storage::v3::PropertyValue(create_vertex.second);
@ -174,7 +174,7 @@ void ExecuteOp(msgs::ShardRequestManager<LocalTransport> &shard_request_manager,
return;
}
msgs::ExecutionState<msgs::CreateVerticesRequest> state;
query::v2::ExecutionState<msgs::CreateVerticesRequest> state;
auto label_id = shard_request_manager.NameToLabel("test_label");
@ -192,9 +192,9 @@ void ExecuteOp(msgs::ShardRequestManager<LocalTransport> &shard_request_manager,
correctness_model.emplace(std::make_pair(create_vertex.first, create_vertex.second));
}
void ExecuteOp(msgs::ShardRequestManager<LocalTransport> &shard_request_manager,
void ExecuteOp(query::v2::ShardRequestManager<LocalTransport> &shard_request_manager,
std::set<CompoundKey> &correctness_model, ScanAll scan_all) {
msgs::ExecutionState<msgs::ScanVerticesRequest> request{.label = "test_label"};
query::v2::ExecutionState<msgs::ScanVerticesRequest> request{.label = "test_label"};
auto results = shard_request_manager.Request(request);
@ -245,7 +245,8 @@ void RunWorkload(int shards, int replication_factor, int create_ops, int scan_op
WaitForShardsToInitialize(coordinator_client);
auto time_after_shard_stabilization = cli_io_2.Now();
msgs::ShardRequestManager<LocalTransport> shard_request_manager(std::move(coordinator_client), std::move(cli_io));
query::v2::ShardRequestManager<LocalTransport> shard_request_manager(std::move(coordinator_client),
std::move(cli_io));
shard_request_manager.StartTransaction();

View File

@ -111,15 +111,15 @@ ShardMap TestShardMap() {
template <typename ShardRequestManager>
void TestScanAll(ShardRequestManager &shard_request_manager) {
msgs::ExecutionState<msgs::ScanVerticesRequest> state{.label = kLabelName};
query::v2::ExecutionState<msgs::ScanVerticesRequest> state{.label = kLabelName};
auto result = shard_request_manager.Request(state);
EXPECT_EQ(result.size(), 2);
}
void TestCreateVertices(msgs::ShardRequestManagerInterface &shard_request_manager) {
void TestCreateVertices(query::v2::ShardRequestManagerInterface &shard_request_manager) {
using PropVal = msgs::Value;
msgs::ExecutionState<msgs::CreateVerticesRequest> state;
query::v2::ExecutionState<msgs::CreateVerticesRequest> state;
std::vector<msgs::NewVertex> new_vertices;
auto label_id = shard_request_manager.NameToLabel(kLabelName);
msgs::NewVertex a1{.primary_key = {PropVal(int64_t(0)), PropVal(int64_t(0))}};
@ -134,9 +134,9 @@ void TestCreateVertices(msgs::ShardRequestManagerInterface &shard_request_manage
EXPECT_FALSE(result[0].error.has_value()) << result[0].error->message;
}
void TestCreateExpand(msgs::ShardRequestManagerInterface &shard_request_manager) {
void TestCreateExpand(query::v2::ShardRequestManagerInterface &shard_request_manager) {
using PropVal = msgs::Value;
msgs::ExecutionState<msgs::CreateExpandRequest> state;
query::v2::ExecutionState<msgs::CreateExpandRequest> state;
std::vector<msgs::NewExpand> new_expands;
const auto edge_type_id = shard_request_manager.NameToEdgeType("edge_type");
@ -155,8 +155,8 @@ void TestCreateExpand(msgs::ShardRequestManagerInterface &shard_request_manager)
MG_ASSERT(!responses[0].error.has_value());
}
void TestExpandOne(msgs::ShardRequestManagerInterface &shard_request_manager) {
msgs::ExecutionState<msgs::ExpandOneRequest> state{};
void TestExpandOne(query::v2::ShardRequestManagerInterface &shard_request_manager) {
query::v2::ExecutionState<msgs::ExpandOneRequest> state{};
msgs::ExpandOneRequest request;
const auto edge_type_id = shard_request_manager.NameToEdgeType("edge_type");
const auto label = msgs::Label{shard_request_manager.NameToLabel("test_label")};
@ -226,7 +226,8 @@ TEST(MachineManager, BasicFunctionality) {
CoordinatorClient<LocalTransport> coordinator_client(cli_io, coordinator_address, {coordinator_address});
msgs::ShardRequestManager<LocalTransport> shard_request_manager(std::move(coordinator_client), std::move(cli_io));
query::v2::ShardRequestManager<LocalTransport> shard_request_manager(std::move(coordinator_client),
std::move(cli_io));
shard_request_manager.StartTransaction();
TestCreateVertices(shard_request_manager);