diff --git a/src/storage/v3/shard_rsm.cpp b/src/storage/v3/shard_rsm.cpp index 4c520858a..20808517d 100644 --- a/src/storage/v3/shard_rsm.cpp +++ b/src/storage/v3/shard_rsm.cpp @@ -482,129 +482,6 @@ EdgeFiller InitializeEdgeFillerFunction(const msgs::ExpandOneRequest &req) { return edge_filler; } -msgs::ReadResponses HandleReadWithOrderBy(msgs::ExpandOneRequest &&req, Shard::Accessor &&acc) { - bool action_successful = true; - - std::vector<msgs::ExpandOneResultRow> results; - auto batch_limit = req.limit; - auto dba = DbAccessor{&acc}; - - auto maybe_filter_based_on_edge_uniquness = InitializeEdgeUniqunessFunction(req.only_unique_neighbor_rows); - auto edge_filler = InitializeEdgeFillerFunction(req); - - auto vertex_iterable = acc.Vertices(View::OLD); // #NoCommit we get all vertices in the DB here, is it wanted? - const auto sorted_vertices = OrderByElements<VertexAccessor>(acc, dba, vertex_iterable, req.order_by); - - for (const auto &ordered_vertice : sorted_vertices) { - // Get Vertex acc - auto src_vertex_acc = ordered_vertice.object_acc; - - if (!req.filters.empty()) { - // NOTE - DbAccessor might get removed in the future. - const bool eval = FilterOnVertex(dba, src_vertex_acc, req.filters, expr::identifier_node_symbol); - if (!eval) { - continue; - } - } - - auto [in_edge_accessors, out_edge_accessors] = GetEdgesFromVertex(src_vertex_acc, req.direction); - const auto in_ordered_edges = OrderByElements<EdgeAccessor>(acc, dba, in_edge_accessors, req.order_by); - const auto out_ordered_edges = OrderByElements<EdgeAccessor>(acc, dba, out_edge_accessors, req.order_by); - - std::vector<EdgeAccessor> in_edge_ordered_accessors; - std::transform(in_ordered_edges.begin(), in_ordered_edges.end(), std::back_inserter(in_edge_ordered_accessors), - [](const auto &edge_element) { return edge_element.object_acc; }); - - std::vector<EdgeAccessor> out_edge_ordered_accessors; - std::transform(out_ordered_edges.begin(), out_ordered_edges.end(), std::back_inserter(out_edge_ordered_accessors), - [](const auto &edge_element) { return edge_element.object_acc; }); - - auto label_id = src_vertex_acc.PrimaryLabel(View::NEW); - if (label_id.HasError()) { - action_successful = false; - break; - } - auto primary_key = src_vertex_acc.PrimaryKey(View::NEW); - if (primary_key.HasError()) { - action_successful = false; - break; - } - - msgs::VertexId src_vertice(msgs::Label{.id = *label_id}, - conversions::ConvertValueVector(*primary_key)); // #NoCommit rename - auto maybe_result = - GetExpandOneResult(src_vertex_acc, src_vertice, req, in_edge_ordered_accessors, out_edge_ordered_accessors, - maybe_filter_based_on_edge_uniquness, edge_filler); - - if (!maybe_result) { - action_successful = false; - break; - } - - results.emplace_back(maybe_result.value()); - if (batch_limit.has_value() && results.size() >= batch_limit.value()) { - break; - } - } - - msgs::ExpandOneResponse resp{}; - resp.success = action_successful; - if (action_successful) { - resp.result = std::move(results); - } - - return resp; -} - -msgs::ReadResponses HandleReadWithoutOrderBy(msgs::ExpandOneRequest &&req, Shard::Accessor &&acc) { - bool action_successful = true; - - std::vector<msgs::ExpandOneResultRow> results; - auto batch_limit = req.limit; - auto dba = DbAccessor{&acc}; - - auto maybe_filter_based_on_edge_uniquness = InitializeEdgeUniqunessFunction(req.only_unique_neighbor_rows); - auto edge_filler = InitializeEdgeFillerFunction(req); - - for (auto &src_vertex : req.src_vertices) { - // Get Vertex acc - auto src_vertex_acc_opt = acc.FindVertex(ConvertPropertyVector((src_vertex.second)), View::NEW); - if (!src_vertex_acc_opt) { - action_successful = false; - spdlog::debug("Encountered an error while trying to obtain VertexAccessor. Transaction id: {}", - req.transaction_id.logical_id); - break; - } - - if (!req.filters.empty()) { - // NOTE - DbAccessor might get removed in the future. - const bool eval = FilterOnVertex(dba, src_vertex_acc_opt.value(), req.filters, expr::identifier_node_symbol); - if (!eval) { - continue; - } - } - auto result = GetExpandOneResult(acc, src_vertex, req, maybe_filter_based_on_edge_uniquness, edge_filler); - - if (!result) { - action_successful = false; - break; - } - - results.emplace_back(result.value()); - if (batch_limit.has_value() && results.size() >= batch_limit.value()) { - break; - } - } - - msgs::ExpandOneResponse resp{}; - resp.success = action_successful; - if (action_successful) { - resp.result = std::move(results); - } - - return resp; -} - }; // namespace msgs::WriteResponses ShardRsm::ApplyWrite(msgs::CreateVerticesRequest &&req) { auto acc = shard_->Access(req.transaction_id); @@ -984,11 +861,102 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::ScanVerticesRequest &&req) { } msgs::ReadResponses ShardRsm::HandleRead(msgs::ExpandOneRequest &&req) { - if (!req.order_by.empty()) { - return HandleReadWithOrderBy(std::move(req), shard_->Access(req.transaction_id)); + auto acc = shard_->Access(req.transaction_id); + bool action_successful = true; + + std::vector<msgs::ExpandOneResultRow> results; + auto batch_limit = req.limit; + auto dba = DbAccessor{&acc}; + + auto maybe_filter_based_on_edge_uniquness = InitializeEdgeUniqunessFunction(req.only_unique_neighbor_rows); + auto edge_filler = InitializeEdgeFillerFunction(req); + + std::vector<VertexAccessor> vertex_accessors; + vertex_accessors.reserve(req.src_vertices.size()); + for (auto &src_vertex : req.src_vertices) { + // Get Vertex acc + auto src_vertex_acc_opt = acc.FindVertex(ConvertPropertyVector((src_vertex.second)), View::NEW); + if (!src_vertex_acc_opt) { + action_successful = false; + spdlog::debug("Encountered an error while trying to obtain VertexAccessor. Transaction id: {}", + req.transaction_id.logical_id); + break; + } + if (!req.filters.empty()) { + // NOTE - DbAccessor might get removed in the future. + const bool eval = FilterOnVertex(dba, src_vertex_acc_opt.value(), req.filters, expr::identifier_node_symbol); + if (!eval) { + continue; + } + } + + vertex_accessors.emplace_back(src_vertex_acc_opt.value()); } - return HandleReadWithoutOrderBy(std::move(req), shard_->Access(req.transaction_id)); + if (!req.order_by.empty()) { + // #NoCommit can we do differently to avoid this? We need OrderByElements but currently + // #NoCommit it returns vector<Element>, so this workaround is here to avoid more duplication later + auto sorted_vertices = OrderByElements<VertexAccessor>(acc, dba, vertex_accessors, req.order_by); + vertex_accessors.clear(); + std::transform(sorted_vertices.begin(), sorted_vertices.end(), std::back_inserter(vertex_accessors), + [](auto &vertex) { return vertex.object_acc; }); + } + + for (const auto &src_vertex_acc : vertex_accessors) { + auto label_id = src_vertex_acc.PrimaryLabel(View::NEW); + if (label_id.HasError()) { + action_successful = false; + break; + } + + auto primary_key = src_vertex_acc.PrimaryKey(View::NEW); + if (primary_key.HasError()) { + action_successful = false; + break; + } + + msgs::VertexId src_vertice(msgs::Label{.id = *label_id}, + conversions::ConvertValueVector(*primary_key)); // #NoCommit rename + + std::optional<msgs::ExpandOneResultRow> maybe_result; + + if (req.order_by.empty()) { + maybe_result = GetExpandOneResult(acc, src_vertice, req, maybe_filter_based_on_edge_uniquness, edge_filler); + + } else { + auto [in_edge_accessors, out_edge_accessors] = GetEdgesFromVertex(src_vertex_acc, req.direction); + const auto in_ordered_edges = OrderByElements<EdgeAccessor>(acc, dba, in_edge_accessors, req.order_by); + const auto out_ordered_edges = OrderByElements<EdgeAccessor>(acc, dba, out_edge_accessors, req.order_by); + + std::vector<EdgeAccessor> in_edge_ordered_accessors; + std::transform(in_ordered_edges.begin(), in_ordered_edges.end(), std::back_inserter(in_edge_ordered_accessors), + [](const auto &edge_element) { return edge_element.object_acc; }); + + std::vector<EdgeAccessor> out_edge_ordered_accessors; + std::transform(out_ordered_edges.begin(), out_ordered_edges.end(), std::back_inserter(out_edge_ordered_accessors), + [](const auto &edge_element) { return edge_element.object_acc; }); + maybe_result = GetExpandOneResult(src_vertex_acc, src_vertice, req, in_edge_ordered_accessors, + out_edge_ordered_accessors, maybe_filter_based_on_edge_uniquness, edge_filler); + } + + if (!maybe_result) { + action_successful = false; + break; + } + + results.emplace_back(maybe_result.value()); + if (batch_limit.has_value() && results.size() >= batch_limit.value()) { + break; + } + } + + msgs::ExpandOneResponse resp{}; + resp.success = action_successful; + if (action_successful) { + resp.result = std::move(results); + } + + return resp; } msgs::WriteResponses ShardRsm::ApplyWrite(msgs::CommitRequest &&req) {