From e901c1fdb78abfe25becb4ec428af3409e221ea2 Mon Sep 17 00:00:00 2001 From: jeremy Date: Tue, 25 Oct 2022 10:45:33 +0200 Subject: [PATCH] Refactor code --- src/storage/v3/shard_rsm.cpp | 231 +++++++++++++++++++---------------- 1 file changed, 126 insertions(+), 105 deletions(-) diff --git a/src/storage/v3/shard_rsm.cpp b/src/storage/v3/shard_rsm.cpp index 7b64731d9..cae2c93d7 100644 --- a/src/storage/v3/shard_rsm.cpp +++ b/src/storage/v3/shard_rsm.cpp @@ -354,7 +354,7 @@ std::optional GetExpandOneResult( return result_row; } -std::optional GetExpandOneResult2( +std::optional GetExpandOneResult( VertexAccessor v_acc, msgs::VertexId src_vertex, const msgs::ExpandOneRequest &req, std::vector in_edge_accessors, std::vector out_edge_accessors, const EdgeUniqunessFunction &maybe_filter_based_on_edge_uniquness, const EdgeFiller &edge_filler) { @@ -494,6 +494,129 @@ EdgeFiller InitializeEdgeFillerFunction(const msgs::ExpandOneRequest &req) { return edge_filler; } +msgs::ReadResponses HandleReadWithOrderBy(msgs::ExpandOneRequest &&req, Shard::Accessor &&acc) { + bool action_successful = true; + + std::vector results; + auto batch_limit = req.limit; + auto dba = DbAccessor{&acc}; + + auto maybe_filter_based_on_edge_uniquness = InitializeEdgeUniqunessFunction(req.only_unique_neighbor_rows); + auto edge_filler = InitializeEdgeFillerFunction(req); + + auto vertex_iterable = acc.Vertices(View::OLD); + const auto ordered_vertices = OrderByElements(acc, dba, vertex_iterable, req.order_by); + + for (const auto &ordered_vertice : ordered_vertices) { + // Get Vertex acc + auto src_vertex_acc = ordered_vertice.object_acc; + + if (!req.filters.empty()) { + // NOTE - DbAccessor might get removed in the future. + const bool eval = FilterOnVertex(dba, src_vertex_acc, req.filters, expr::identifier_node_symbol); + if (!eval) { + continue; + } + } + + auto [in_edge_accessors, out_edge_accessors] = GetEdgesFromVertex(src_vertex_acc, req.direction); + const auto in_ordered_edges = OrderByElements(acc, dba, in_edge_accessors, req.order_by); + const auto out_ordered_edges = OrderByElements(acc, dba, out_edge_accessors, req.order_by); + + std::vector in_edge_ordered_accessors; + std::transform(in_ordered_edges.begin(), in_ordered_edges.end(), std::back_inserter(in_edge_ordered_accessors), + [](auto &edge_element) { return edge_element.object_acc; }); + + std::vector out_edge_ordered_accessors; + std::transform(out_ordered_edges.begin(), out_ordered_edges.end(), std::back_inserter(out_edge_ordered_accessors), + [](auto &edge_element) { return edge_element.object_acc; }); + + auto label_id = src_vertex_acc.PrimaryLabel(View::NEW); + if (label_id.HasError()) { + action_successful = false; + break; + } + auto primary_key = src_vertex_acc.PrimaryKey(View::NEW); + if (primary_key.HasError()) { + action_successful = false; + break; + } + + msgs::VertexId src_vertice(msgs::Label{.id = *label_id}, conversions::ConvertValueVector(*primary_key)); + auto maybe_result = + GetExpandOneResult(src_vertex_acc, src_vertice, req, in_edge_ordered_accessors, out_edge_ordered_accessors, + maybe_filter_based_on_edge_uniquness, edge_filler); + + if (!maybe_result) { + action_successful = false; + break; + } + + results.emplace_back(maybe_result.value()); + if (batch_limit.has_value() && results.size() >= batch_limit.value()) { + break; + } + } + + msgs::ExpandOneResponse resp{}; + resp.success = action_successful; + if (action_successful) { + resp.result = std::move(results); + } + + return resp; +} + +msgs::ReadResponses HandleReadWithoutOrderBy(msgs::ExpandOneRequest &&req, Shard::Accessor &&acc) { + bool action_successful = true; + + std::vector results; + auto batch_limit = req.limit; + auto dba = DbAccessor{&acc}; + + auto maybe_filter_based_on_edge_uniquness = InitializeEdgeUniqunessFunction(req.only_unique_neighbor_rows); + auto edge_filler = InitializeEdgeFillerFunction(req); + + for (auto &src_vertex : req.src_vertices) { + // Get Vertex acc + auto src_vertex_acc_opt = acc.FindVertex(ConvertPropertyVector((src_vertex.second)), View::NEW); + if (!src_vertex_acc_opt) { + action_successful = false; + spdlog::debug("Encountered an error while trying to obtain VertexAccessor. Transaction id: {}", + req.transaction_id.logical_id); + break; + } + + if (!req.filters.empty()) { + // NOTE - DbAccessor might get removed in the future. + auto dba = DbAccessor{&acc}; + const bool eval = FilterOnVertex(dba, src_vertex_acc_opt.value(), req.filters, expr::identifier_node_symbol); + if (!eval) { + continue; + } + } + auto result = GetExpandOneResult(acc, src_vertex, req, maybe_filter_based_on_edge_uniquness, edge_filler); + + if (!result) { + action_successful = false; + break; + } + + results.emplace_back(result.value()); + if (batch_limit.has_value() && results.size() >= batch_limit.value()) { + break; + } + } + + msgs::ExpandOneResponse resp{}; + resp.success = action_successful; + if (action_successful) { + resp.result = std::move(results); + } + + return resp; +} + }; // namespace msgs::WriteResponses ShardRsm::ApplyWrite(msgs::CreateVerticesRequest &&req) { auto acc = shard_->Access(req.transaction_id); @@ -887,113 +1010,11 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::ScanVerticesRequest &&req) { } msgs::ReadResponses ShardRsm::HandleRead(msgs::ExpandOneRequest &&req) { - auto acc = shard_->Access(req.transaction_id); - bool action_successful = true; - - std::vector results; - auto batch_limit = req.limit; - auto dba = DbAccessor{&acc}; - - auto maybe_filter_based_on_edge_uniquness = InitializeEdgeUniqunessFunction(req.only_unique_neighbor_rows); - auto edge_filler = InitializeEdgeFillerFunction(req); - - //#NoCommit code below is duplicated, one needs to factor it once it's clear! if (!req.order_by.empty()) { - auto vertex_iterable = acc.Vertices(View::OLD); - const auto ordered_vertices = OrderByElements(acc, dba, vertex_iterable, req.order_by); - - for (auto &ordered_vertice : ordered_vertices) { - // Get Vertex acc - auto src_vertex_acc = ordered_vertice.object_acc; - - if (!req.filters.empty()) { - // NOTE - DbAccessor might get removed in the future. - const bool eval = FilterOnVertex(dba, src_vertex_acc, req.filters, expr::identifier_node_symbol); - if (!eval) { - continue; - } - } - - auto [in_edge_accessors, out_edge_accessors] = GetEdgesFromVertex(src_vertex_acc, req.direction); - const auto in_ordered_edges = OrderByElements(acc, dba, in_edge_accessors, req.order_by); - const auto out_ordered_edges = OrderByElements(acc, dba, out_edge_accessors, req.order_by); - - std::vector in_edge_ordered_accessors; - std::transform(in_ordered_edges.begin(), in_ordered_edges.end(), std::back_inserter(in_edge_ordered_accessors), - [](auto &edge_element) { return edge_element.object_acc; }); - - std::vector out_edge_ordered_accessors; - std::transform(out_ordered_edges.begin(), out_ordered_edges.end(), std::back_inserter(out_edge_ordered_accessors), - [](auto &edge_element) { return edge_element.object_acc; }); - - auto label_id = src_vertex_acc.PrimaryLabel(View::NEW); - if (label_id.HasError()) { - action_successful = false; - break; - } - auto primary_key = src_vertex_acc.PrimaryKey(View::NEW); - if (primary_key.HasError()) { - action_successful = false; - break; - } - - msgs::VertexId src_vertice; - src_vertice.first = msgs::Label{.id = *label_id}; - src_vertice.second = conversions::ConvertValueVector(*primary_key); - auto maybe_result = - GetExpandOneResult2(src_vertex_acc, src_vertice, req, in_edge_ordered_accessors, out_edge_ordered_accessors, - maybe_filter_based_on_edge_uniquness, edge_filler); - - if (!maybe_result) { - action_successful = false; - break; - } - - results.emplace_back(maybe_result.value()); - if (batch_limit.has_value() && results.size() >= batch_limit.value()) { - break; - } - } + return HandleReadWithOrderBy(std::move(req), shard_->Access(req.transaction_id)); } else { - for (auto &src_vertex : req.src_vertices) { - // Get Vertex acc - auto src_vertex_acc_opt = acc.FindVertex(ConvertPropertyVector((src_vertex.second)), View::NEW); - if (!src_vertex_acc_opt) { - action_successful = false; - spdlog::debug("Encountered an error while trying to obtain VertexAccessor. Transaction id: {}", - req.transaction_id.logical_id); - break; - } - - if (!req.filters.empty()) { - // NOTE - DbAccessor might get removed in the future. - auto dba = DbAccessor{&acc}; - const bool eval = FilterOnVertex(dba, src_vertex_acc_opt.value(), req.filters, expr::identifier_node_symbol); - if (!eval) { - continue; - } - } - auto result = GetExpandOneResult(acc, src_vertex, req, maybe_filter_based_on_edge_uniquness, edge_filler); - - if (!result) { - action_successful = false; - break; - } - - results.emplace_back(result.value()); - if (batch_limit.has_value() && results.size() >= batch_limit.value()) { - break; - } - } + return HandleReadWithoutOrderBy(std::move(req), shard_->Access(req.transaction_id)); } - - msgs::ExpandOneResponse resp{}; - resp.success = action_successful; - if (action_successful) { - resp.result = std::move(results); - } - - return resp; } msgs::WriteResponses ShardRsm::ApplyWrite(msgs::CommitRequest &&req) {