Factor HandleRead(msgs::ExpandOneRequest..

This commit is contained in:
jeremy 2022-10-28 15:31:29 +02:00
parent e0f6c951c1
commit b2e9717ec3

View File

@ -482,129 +482,6 @@ EdgeFiller InitializeEdgeFillerFunction(const msgs::ExpandOneRequest &req) {
return edge_filler;
}
msgs::ReadResponses HandleReadWithOrderBy(msgs::ExpandOneRequest &&req, Shard::Accessor &&acc) {
bool action_successful = true;
std::vector<msgs::ExpandOneResultRow> results;
auto batch_limit = req.limit;
auto dba = DbAccessor{&acc};
auto maybe_filter_based_on_edge_uniquness = InitializeEdgeUniqunessFunction(req.only_unique_neighbor_rows);
auto edge_filler = InitializeEdgeFillerFunction(req);
auto vertex_iterable = acc.Vertices(View::OLD); // #NoCommit we get all vertices in the DB here, is it wanted?
const auto sorted_vertices = OrderByElements<VertexAccessor>(acc, dba, vertex_iterable, req.order_by);
for (const auto &ordered_vertice : sorted_vertices) {
// Get Vertex acc
auto src_vertex_acc = ordered_vertice.object_acc;
if (!req.filters.empty()) {
// NOTE - DbAccessor might get removed in the future.
const bool eval = FilterOnVertex(dba, src_vertex_acc, req.filters, expr::identifier_node_symbol);
if (!eval) {
continue;
}
}
auto [in_edge_accessors, out_edge_accessors] = GetEdgesFromVertex(src_vertex_acc, req.direction);
const auto in_ordered_edges = OrderByElements<EdgeAccessor>(acc, dba, in_edge_accessors, req.order_by);
const auto out_ordered_edges = OrderByElements<EdgeAccessor>(acc, dba, out_edge_accessors, req.order_by);
std::vector<EdgeAccessor> in_edge_ordered_accessors;
std::transform(in_ordered_edges.begin(), in_ordered_edges.end(), std::back_inserter(in_edge_ordered_accessors),
[](const auto &edge_element) { return edge_element.object_acc; });
std::vector<EdgeAccessor> out_edge_ordered_accessors;
std::transform(out_ordered_edges.begin(), out_ordered_edges.end(), std::back_inserter(out_edge_ordered_accessors),
[](const auto &edge_element) { return edge_element.object_acc; });
auto label_id = src_vertex_acc.PrimaryLabel(View::NEW);
if (label_id.HasError()) {
action_successful = false;
break;
}
auto primary_key = src_vertex_acc.PrimaryKey(View::NEW);
if (primary_key.HasError()) {
action_successful = false;
break;
}
msgs::VertexId src_vertice(msgs::Label{.id = *label_id},
conversions::ConvertValueVector(*primary_key)); // #NoCommit rename
auto maybe_result =
GetExpandOneResult(src_vertex_acc, src_vertice, req, in_edge_ordered_accessors, out_edge_ordered_accessors,
maybe_filter_based_on_edge_uniquness, edge_filler);
if (!maybe_result) {
action_successful = false;
break;
}
results.emplace_back(maybe_result.value());
if (batch_limit.has_value() && results.size() >= batch_limit.value()) {
break;
}
}
msgs::ExpandOneResponse resp{};
resp.success = action_successful;
if (action_successful) {
resp.result = std::move(results);
}
return resp;
}
msgs::ReadResponses HandleReadWithoutOrderBy(msgs::ExpandOneRequest &&req, Shard::Accessor &&acc) {
bool action_successful = true;
std::vector<msgs::ExpandOneResultRow> results;
auto batch_limit = req.limit;
auto dba = DbAccessor{&acc};
auto maybe_filter_based_on_edge_uniquness = InitializeEdgeUniqunessFunction(req.only_unique_neighbor_rows);
auto edge_filler = InitializeEdgeFillerFunction(req);
for (auto &src_vertex : req.src_vertices) {
// Get Vertex acc
auto src_vertex_acc_opt = acc.FindVertex(ConvertPropertyVector((src_vertex.second)), View::NEW);
if (!src_vertex_acc_opt) {
action_successful = false;
spdlog::debug("Encountered an error while trying to obtain VertexAccessor. Transaction id: {}",
req.transaction_id.logical_id);
break;
}
if (!req.filters.empty()) {
// NOTE - DbAccessor might get removed in the future.
const bool eval = FilterOnVertex(dba, src_vertex_acc_opt.value(), req.filters, expr::identifier_node_symbol);
if (!eval) {
continue;
}
}
auto result = GetExpandOneResult(acc, src_vertex, req, maybe_filter_based_on_edge_uniquness, edge_filler);
if (!result) {
action_successful = false;
break;
}
results.emplace_back(result.value());
if (batch_limit.has_value() && results.size() >= batch_limit.value()) {
break;
}
}
msgs::ExpandOneResponse resp{};
resp.success = action_successful;
if (action_successful) {
resp.result = std::move(results);
}
return resp;
}
}; // namespace
msgs::WriteResponses ShardRsm::ApplyWrite(msgs::CreateVerticesRequest &&req) {
auto acc = shard_->Access(req.transaction_id);
@ -984,11 +861,102 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::ScanVerticesRequest &&req) {
}
msgs::ReadResponses ShardRsm::HandleRead(msgs::ExpandOneRequest &&req) {
if (!req.order_by.empty()) {
return HandleReadWithOrderBy(std::move(req), shard_->Access(req.transaction_id));
auto acc = shard_->Access(req.transaction_id);
bool action_successful = true;
std::vector<msgs::ExpandOneResultRow> results;
auto batch_limit = req.limit;
auto dba = DbAccessor{&acc};
auto maybe_filter_based_on_edge_uniquness = InitializeEdgeUniqunessFunction(req.only_unique_neighbor_rows);
auto edge_filler = InitializeEdgeFillerFunction(req);
std::vector<VertexAccessor> vertex_accessors;
vertex_accessors.reserve(req.src_vertices.size());
for (auto &src_vertex : req.src_vertices) {
// Get Vertex acc
auto src_vertex_acc_opt = acc.FindVertex(ConvertPropertyVector((src_vertex.second)), View::NEW);
if (!src_vertex_acc_opt) {
action_successful = false;
spdlog::debug("Encountered an error while trying to obtain VertexAccessor. Transaction id: {}",
req.transaction_id.logical_id);
break;
}
if (!req.filters.empty()) {
// NOTE - DbAccessor might get removed in the future.
const bool eval = FilterOnVertex(dba, src_vertex_acc_opt.value(), req.filters, expr::identifier_node_symbol);
if (!eval) {
continue;
}
}
vertex_accessors.emplace_back(src_vertex_acc_opt.value());
}
return HandleReadWithoutOrderBy(std::move(req), shard_->Access(req.transaction_id));
if (!req.order_by.empty()) {
// #NoCommit can we do differently to avoid this? We need OrderByElements but currently
// #NoCommit it returns vector<Element>, so this workaround is here to avoid more duplication later
auto sorted_vertices = OrderByElements<VertexAccessor>(acc, dba, vertex_accessors, req.order_by);
vertex_accessors.clear();
std::transform(sorted_vertices.begin(), sorted_vertices.end(), std::back_inserter(vertex_accessors),
[](auto &vertex) { return vertex.object_acc; });
}
for (const auto &src_vertex_acc : vertex_accessors) {
auto label_id = src_vertex_acc.PrimaryLabel(View::NEW);
if (label_id.HasError()) {
action_successful = false;
break;
}
auto primary_key = src_vertex_acc.PrimaryKey(View::NEW);
if (primary_key.HasError()) {
action_successful = false;
break;
}
msgs::VertexId src_vertice(msgs::Label{.id = *label_id},
conversions::ConvertValueVector(*primary_key)); // #NoCommit rename
std::optional<msgs::ExpandOneResultRow> maybe_result;
if (req.order_by.empty()) {
maybe_result = GetExpandOneResult(acc, src_vertice, req, maybe_filter_based_on_edge_uniquness, edge_filler);
} else {
auto [in_edge_accessors, out_edge_accessors] = GetEdgesFromVertex(src_vertex_acc, req.direction);
const auto in_ordered_edges = OrderByElements<EdgeAccessor>(acc, dba, in_edge_accessors, req.order_by);
const auto out_ordered_edges = OrderByElements<EdgeAccessor>(acc, dba, out_edge_accessors, req.order_by);
std::vector<EdgeAccessor> in_edge_ordered_accessors;
std::transform(in_ordered_edges.begin(), in_ordered_edges.end(), std::back_inserter(in_edge_ordered_accessors),
[](const auto &edge_element) { return edge_element.object_acc; });
std::vector<EdgeAccessor> out_edge_ordered_accessors;
std::transform(out_ordered_edges.begin(), out_ordered_edges.end(), std::back_inserter(out_edge_ordered_accessors),
[](const auto &edge_element) { return edge_element.object_acc; });
maybe_result = GetExpandOneResult(src_vertex_acc, src_vertice, req, in_edge_ordered_accessors,
out_edge_ordered_accessors, maybe_filter_based_on_edge_uniquness, edge_filler);
}
if (!maybe_result) {
action_successful = false;
break;
}
results.emplace_back(maybe_result.value());
if (batch_limit.has_value() && results.size() >= batch_limit.value()) {
break;
}
}
msgs::ExpandOneResponse resp{};
resp.success = action_successful;
if (action_successful) {
resp.result = std::move(results);
}
return resp;
}
msgs::WriteResponses ShardRsm::ApplyWrite(msgs::CommitRequest &&req) {