Make method names clear for RequestRouter requests, avoid unnecessary overloading
This commit is contained in:
parent
9144d2dccd
commit
8f08d986cb
@ -180,7 +180,7 @@ class DistributedCreateNodeCursor : public Cursor {
|
||||
auto &request_router = context.request_router;
|
||||
{
|
||||
SCOPED_REQUEST_WAIT_PROFILE;
|
||||
request_router->Request(NodeCreationInfoToRequest(context, frame));
|
||||
request_router->CreateVertices(NodeCreationInfoToRequest(context, frame));
|
||||
}
|
||||
PlaceNodeOnTheFrame(frame, context);
|
||||
return true;
|
||||
@ -393,7 +393,7 @@ class DistributedScanAllAndFilterCursor : public Cursor {
|
||||
if (label_.has_value()) {
|
||||
request_label = request_router.LabelToName(*label_);
|
||||
}
|
||||
current_batch = request_router.Request(request_label);
|
||||
current_batch = request_router.ScanVertices(request_label);
|
||||
}
|
||||
current_vertex_it = current_batch.begin();
|
||||
request_state_ = State::COMPLETED;
|
||||
@ -2344,7 +2344,7 @@ class DistributedCreateExpandCursor : public Cursor {
|
||||
ResetExecutionState();
|
||||
{
|
||||
SCOPED_REQUEST_WAIT_PROFILE;
|
||||
request_router->Request(ExpandCreationInfoToRequest(context, frame));
|
||||
request_router->CreateExpand(ExpandCreationInfoToRequest(context, frame));
|
||||
}
|
||||
return true;
|
||||
}
|
||||
@ -2474,7 +2474,7 @@ class DistributedExpandCursor : public Cursor {
|
||||
request.edge_properties.emplace();
|
||||
request.src_vertices.push_back(get_dst_vertex(edge, direction));
|
||||
request.direction = (direction == EdgeAtom::Direction::IN) ? msgs::EdgeDirection::OUT : msgs::EdgeDirection::IN;
|
||||
auto result_rows = context.request_router->Request(std::move(request));
|
||||
auto result_rows = context.request_router->ExpandOne(std::move(request));
|
||||
MG_ASSERT(result_rows.size() == 1);
|
||||
auto &result_row = result_rows.front();
|
||||
frame[self_.common_.node_symbol] = accessors::VertexAccessor(
|
||||
@ -2501,7 +2501,7 @@ class DistributedExpandCursor : public Cursor {
|
||||
request.src_vertices.push_back(vertex.Id());
|
||||
auto result_rows = std::invoke([&context, &request]() mutable {
|
||||
SCOPED_REQUEST_WAIT_PROFILE;
|
||||
return context.request_router->Request(std::move(request));
|
||||
return context.request_router->ExpandOne(std::move(request));
|
||||
});
|
||||
MG_ASSERT(result_rows.size() == 1);
|
||||
auto &result_row = result_rows.front();
|
||||
|
@ -86,9 +86,6 @@ struct ExecutionState {
|
||||
// label is optional because some operators can create/remove etc, vertices. These kind of requests contain the label
|
||||
// on the request itself.
|
||||
std::optional<std::string> label;
|
||||
// CompoundKey is optional because some operators require to iterate over all the available keys
|
||||
// of a shard. One example is ScanAll, where we only require the field label.
|
||||
std::optional<CompoundKey> key;
|
||||
// Transaction id to be filled by the RequestRouter implementation
|
||||
coordinator::Hlc transaction_id;
|
||||
// Initialized by RequestRouter implementation. This vector is filled with the shards that
|
||||
@ -111,10 +108,10 @@ class RequestRouterInterface {
|
||||
|
||||
virtual void StartTransaction() = 0;
|
||||
virtual void Commit() = 0;
|
||||
virtual std::vector<VertexAccessor> Request(std::optional<std::string> &label) = 0;
|
||||
virtual std::vector<msgs::CreateVerticesResponse> Request(std::vector<msgs::NewVertex> new_vertices) = 0;
|
||||
virtual std::vector<msgs::ExpandOneResultRow> Request(msgs::ExpandOneRequest request) = 0;
|
||||
virtual std::vector<msgs::CreateExpandResponse> Request(std::vector<msgs::NewExpand> new_edges) = 0;
|
||||
virtual std::vector<VertexAccessor> ScanVertices(std::optional<std::string> label) = 0;
|
||||
virtual std::vector<msgs::CreateVerticesResponse> CreateVertices(std::vector<msgs::NewVertex> new_vertices) = 0;
|
||||
virtual std::vector<msgs::ExpandOneResultRow> ExpandOne(msgs::ExpandOneRequest request) = 0;
|
||||
virtual std::vector<msgs::CreateExpandResponse> CreateExpand(std::vector<msgs::NewExpand> new_edges) = 0;
|
||||
|
||||
virtual storage::v3::EdgeTypeId NameToEdgeType(const std::string &name) const = 0;
|
||||
virtual storage::v3::PropertyId NameToProperty(const std::string &name) const = 0;
|
||||
@ -240,10 +237,10 @@ class RequestRouter : public RequestRouterInterface {
|
||||
bool IsPrimaryLabel(storage::v3::LabelId label) const override { return shards_map_.label_spaces.contains(label); }
|
||||
|
||||
// TODO(kostasrim) Simplify return result
|
||||
std::vector<VertexAccessor> Request(std::optional<std::string> &label) override {
|
||||
std::vector<VertexAccessor> ScanVertices(std::optional<std::string> label) override {
|
||||
ExecutionState<msgs::ScanVerticesRequest> state = {};
|
||||
state.label = label;
|
||||
MaybeInitializeExecutionState(state);
|
||||
InitializeExecutionState(state);
|
||||
std::vector<msgs::ScanVerticesResponse> responses;
|
||||
|
||||
SendAllRequests(state);
|
||||
@ -257,10 +254,10 @@ class RequestRouter : public RequestRouterInterface {
|
||||
return PostProcess(std::move(responses));
|
||||
}
|
||||
|
||||
std::vector<msgs::CreateVerticesResponse> Request(std::vector<msgs::NewVertex> new_vertices) override {
|
||||
std::vector<msgs::CreateVerticesResponse> CreateVertices(std::vector<msgs::NewVertex> new_vertices) override {
|
||||
ExecutionState<msgs::CreateVerticesRequest> state = {};
|
||||
MG_ASSERT(!new_vertices.empty());
|
||||
MaybeInitializeExecutionState(state, new_vertices);
|
||||
InitializeExecutionState(state, new_vertices);
|
||||
std::vector<msgs::CreateVerticesResponse> responses;
|
||||
|
||||
// 1. Send the requests.
|
||||
@ -276,10 +273,10 @@ class RequestRouter : public RequestRouterInterface {
|
||||
return responses;
|
||||
}
|
||||
|
||||
std::vector<msgs::CreateExpandResponse> Request(std::vector<msgs::NewExpand> new_edges) override {
|
||||
std::vector<msgs::CreateExpandResponse> CreateExpand(std::vector<msgs::NewExpand> new_edges) override {
|
||||
ExecutionState<msgs::CreateExpandRequest> state = {};
|
||||
MG_ASSERT(!new_edges.empty());
|
||||
MaybeInitializeExecutionState(state, new_edges);
|
||||
InitializeExecutionState(state, new_edges);
|
||||
std::vector<msgs::CreateExpandResponse> responses;
|
||||
for (auto &request : state.requests) {
|
||||
auto &storage_client = GetStorageClientForShard(request.shard);
|
||||
@ -301,14 +298,14 @@ class RequestRouter : public RequestRouterInterface {
|
||||
return responses;
|
||||
}
|
||||
|
||||
std::vector<msgs::ExpandOneResultRow> Request(msgs::ExpandOneRequest request) override {
|
||||
std::vector<msgs::ExpandOneResultRow> ExpandOne(msgs::ExpandOneRequest request) override {
|
||||
ExecutionState<msgs::ExpandOneRequest> state = {};
|
||||
// TODO(kostasrim)Update to limit the batch size here
|
||||
// Expansions of the destination must be handled by the caller. For example
|
||||
// match (u:L1 { prop : 1 })-[:Friend]-(v:L1)
|
||||
// For each vertex U, the ExpandOne will result in <U, Edges>. The destination vertex and its properties
|
||||
// must be fetched again with an ExpandOne(Edges.dst)
|
||||
MaybeInitializeExecutionState(state, std::move(request));
|
||||
InitializeExecutionState(state, std::move(request));
|
||||
std::vector<msgs::ExpandOneResponse> responses;
|
||||
|
||||
// 1. Send the requests.
|
||||
@ -355,8 +352,8 @@ class RequestRouter : public RequestRouterInterface {
|
||||
return accessors;
|
||||
}
|
||||
|
||||
void MaybeInitializeExecutionState(ExecutionState<msgs::CreateVerticesRequest> &state,
|
||||
std::vector<msgs::NewVertex> new_vertices) {
|
||||
void InitializeExecutionState(ExecutionState<msgs::CreateVerticesRequest> &state,
|
||||
std::vector<msgs::NewVertex> new_vertices) {
|
||||
state.transaction_id = transaction_id_;
|
||||
|
||||
std::map<Shard, msgs::CreateVerticesRequest> per_shard_request_table;
|
||||
@ -382,8 +379,8 @@ class RequestRouter : public RequestRouterInterface {
|
||||
}
|
||||
}
|
||||
|
||||
void MaybeInitializeExecutionState(ExecutionState<msgs::CreateExpandRequest> &state,
|
||||
std::vector<msgs::NewExpand> new_expands) {
|
||||
void InitializeExecutionState(ExecutionState<msgs::CreateExpandRequest> &state,
|
||||
std::vector<msgs::NewExpand> new_expands) {
|
||||
state.transaction_id = transaction_id_;
|
||||
|
||||
std::map<Shard, msgs::CreateExpandRequest> per_shard_request_table;
|
||||
@ -420,7 +417,7 @@ class RequestRouter : public RequestRouterInterface {
|
||||
}
|
||||
}
|
||||
|
||||
void MaybeInitializeExecutionState(ExecutionState<msgs::ScanVerticesRequest> &state) {
|
||||
void InitializeExecutionState(ExecutionState<msgs::ScanVerticesRequest> &state) {
|
||||
std::vector<coordinator::Shards> multi_shards;
|
||||
state.transaction_id = transaction_id_;
|
||||
if (!state.label) {
|
||||
@ -450,7 +447,7 @@ class RequestRouter : public RequestRouterInterface {
|
||||
}
|
||||
}
|
||||
|
||||
void MaybeInitializeExecutionState(ExecutionState<msgs::ExpandOneRequest> &state, msgs::ExpandOneRequest request) {
|
||||
void InitializeExecutionState(ExecutionState<msgs::ExpandOneRequest> &state, msgs::ExpandOneRequest request) {
|
||||
state.transaction_id = transaction_id_;
|
||||
|
||||
std::map<Shard, msgs::ExpandOneRequest> per_shard_request_table;
|
||||
|
@ -164,8 +164,6 @@ void ExecuteOp(query::v2::RequestRouter<SimulatorTransport> &request_router, std
|
||||
return;
|
||||
}
|
||||
|
||||
query::v2::ExecutionState<msgs::CreateVerticesRequest> state;
|
||||
|
||||
auto label_id = request_router.NameToLabel("test_label");
|
||||
|
||||
msgs::NewVertex nv{.primary_key = primary_key};
|
||||
@ -174,7 +172,7 @@ void ExecuteOp(query::v2::RequestRouter<SimulatorTransport> &request_router, std
|
||||
std::vector<msgs::NewVertex> new_vertices;
|
||||
new_vertices.push_back(std::move(nv));
|
||||
|
||||
auto result = request_router.Request(state, std::move(new_vertices));
|
||||
auto result = request_router.CreateVertices(std::move(new_vertices));
|
||||
|
||||
RC_ASSERT(result.size() == 1);
|
||||
RC_ASSERT(!result[0].error.has_value());
|
||||
@ -184,9 +182,7 @@ void ExecuteOp(query::v2::RequestRouter<SimulatorTransport> &request_router, std
|
||||
|
||||
void ExecuteOp(query::v2::RequestRouter<SimulatorTransport> &request_router, std::set<CompoundKey> &correctness_model,
|
||||
ScanAll scan_all) {
|
||||
query::v2::ExecutionState<msgs::ScanVerticesRequest> request{.label = "test_label"};
|
||||
|
||||
auto results = request_router.Request(request);
|
||||
auto results = request_router.ScanVertices("test_label");
|
||||
|
||||
RC_ASSERT(results.size() == correctness_model.size());
|
||||
|
||||
|
@ -182,7 +182,7 @@ void ExecuteOp(query::v2::RequestRouter<LocalTransport> &request_router, std::se
|
||||
std::vector<msgs::NewVertex> new_vertices;
|
||||
new_vertices.push_back(std::move(nv));
|
||||
|
||||
auto result = request_router.Request(std::move(new_vertices));
|
||||
auto result = request_router.CreateVertices(std::move(new_vertices));
|
||||
|
||||
MG_ASSERT(result.size() == 1);
|
||||
MG_ASSERT(!result[0].error.has_value());
|
||||
@ -194,7 +194,7 @@ void ExecuteOp(query::v2::RequestRouter<LocalTransport> &request_router, std::se
|
||||
ScanAll scan_all) {
|
||||
query::v2::ExecutionState<msgs::ScanVerticesRequest> request{.label = "test_label"};
|
||||
|
||||
auto results = request_router.Request(request);
|
||||
auto results = request_router.ScanVertices("test_label");
|
||||
|
||||
MG_ASSERT(results.size() == correctness_model.size());
|
||||
|
||||
|
@ -111,15 +111,12 @@ ShardMap TestShardMap() {
|
||||
|
||||
template <typename RequestRouter>
|
||||
void TestScanAll(RequestRouter &request_router) {
|
||||
query::v2::ExecutionState<msgs::ScanVerticesRequest> state{.label = kLabelName};
|
||||
|
||||
auto result = request_router.Request(state);
|
||||
auto result = request_router.ScanVertices(kLabelName);
|
||||
EXPECT_EQ(result.size(), 2);
|
||||
}
|
||||
|
||||
void TestCreateVertices(query::v2::RequestRouterInterface &request_router) {
|
||||
using PropVal = msgs::Value;
|
||||
query::v2::ExecutionState<msgs::CreateVerticesRequest> state;
|
||||
std::vector<msgs::NewVertex> new_vertices;
|
||||
auto label_id = request_router.NameToLabel(kLabelName);
|
||||
msgs::NewVertex a1{.primary_key = {PropVal(int64_t(0)), PropVal(int64_t(0))}};
|
||||
@ -129,14 +126,13 @@ void TestCreateVertices(query::v2::RequestRouterInterface &request_router) {
|
||||
new_vertices.push_back(std::move(a1));
|
||||
new_vertices.push_back(std::move(a2));
|
||||
|
||||
auto result = request_router.Request(state, std::move(new_vertices));
|
||||
auto result = request_router.CreateVertices(std::move(new_vertices));
|
||||
EXPECT_EQ(result.size(), 1);
|
||||
EXPECT_FALSE(result[0].error.has_value()) << result[0].error->message;
|
||||
}
|
||||
|
||||
void TestCreateExpand(query::v2::RequestRouterInterface &request_router) {
|
||||
using PropVal = msgs::Value;
|
||||
query::v2::ExecutionState<msgs::CreateExpandRequest> state;
|
||||
std::vector<msgs::NewExpand> new_expands;
|
||||
|
||||
const auto edge_type_id = request_router.NameToEdgeType("edge_type");
|
||||
@ -150,20 +146,19 @@ void TestCreateExpand(query::v2::RequestRouterInterface &request_router) {
|
||||
new_expands.push_back(std::move(expand_1));
|
||||
new_expands.push_back(std::move(expand_2));
|
||||
|
||||
auto responses = request_router.Request(state, std::move(new_expands));
|
||||
auto responses = request_router.CreateExpand(std::move(new_expands));
|
||||
MG_ASSERT(responses.size() == 1);
|
||||
MG_ASSERT(!responses[0].error.has_value());
|
||||
}
|
||||
|
||||
void TestExpandOne(query::v2::RequestRouterInterface &request_router) {
|
||||
query::v2::ExecutionState<msgs::ExpandOneRequest> state{};
|
||||
msgs::ExpandOneRequest request;
|
||||
const auto edge_type_id = request_router.NameToEdgeType("edge_type");
|
||||
const auto label = msgs::Label{request_router.NameToLabel("test_label")};
|
||||
request.src_vertices.push_back(msgs::VertexId{label, {msgs::Value(int64_t(0)), msgs::Value(int64_t(0))}});
|
||||
request.edge_types.push_back(msgs::EdgeType{edge_type_id});
|
||||
request.direction = msgs::EdgeDirection::BOTH;
|
||||
auto result_rows = request_router.Request(state, std::move(request));
|
||||
auto result_rows = request_router.ExpandOne(std::move(request));
|
||||
MG_ASSERT(result_rows.size() == 1);
|
||||
MG_ASSERT(result_rows[0].in_edges_with_all_properties.size() == 1);
|
||||
MG_ASSERT(result_rows[0].out_edges_with_all_properties.size() == 1);
|
||||
|
@ -82,23 +82,15 @@ class MockedRequestRouter : public RequestRouterInterface {
|
||||
}
|
||||
void StartTransaction() override {}
|
||||
void Commit() override {}
|
||||
std::vector<VertexAccessor> Request(ExecutionState<memgraph::msgs::ScanVerticesRequest> &state) override {
|
||||
std::vector<VertexAccessor> ScanVertices(std::optional<std::string> /* label */) override { return {}; }
|
||||
|
||||
std::vector<CreateVerticesResponse> CreateVertices(std::vector<memgraph::msgs::NewVertex> new_vertices) override {
|
||||
return {};
|
||||
}
|
||||
|
||||
std::vector<CreateVerticesResponse> Request(ExecutionState<CreateVerticesRequest> &state,
|
||||
std::vector<memgraph::msgs::NewVertex> new_vertices) override {
|
||||
return {};
|
||||
}
|
||||
std::vector<ExpandOneResultRow> ExpandOne(ExpandOneRequest request) override { return {}; }
|
||||
|
||||
std::vector<ExpandOneResultRow> Request(ExecutionState<ExpandOneRequest> &state, ExpandOneRequest request) override {
|
||||
return {};
|
||||
}
|
||||
|
||||
std::vector<CreateExpandResponse> Request(ExecutionState<CreateExpandRequest> &state,
|
||||
std::vector<NewExpand> new_edges) override {
|
||||
return {};
|
||||
}
|
||||
std::vector<CreateExpandResponse> CreateExpand(std::vector<NewExpand> new_edges) override { return {}; }
|
||||
|
||||
const std::string &PropertyToName(memgraph::storage::v3::PropertyId id) const override {
|
||||
return properties_.IdToName(id.AsUint());
|
||||
|
Loading…
Reference in New Issue
Block a user