Refactor code
This commit is contained in:
parent
016b3ee0d2
commit
e901c1fdb7
@ -354,7 +354,7 @@ std::optional<msgs::ExpandOneResultRow> GetExpandOneResult(
|
||||
return result_row;
|
||||
}
|
||||
|
||||
std::optional<msgs::ExpandOneResultRow> GetExpandOneResult2(
|
||||
std::optional<msgs::ExpandOneResultRow> GetExpandOneResult(
|
||||
VertexAccessor v_acc, msgs::VertexId src_vertex, const msgs::ExpandOneRequest &req,
|
||||
std::vector<EdgeAccessor> in_edge_accessors, std::vector<EdgeAccessor> out_edge_accessors,
|
||||
const EdgeUniqunessFunction &maybe_filter_based_on_edge_uniquness, const EdgeFiller &edge_filler) {
|
||||
@ -494,6 +494,129 @@ EdgeFiller InitializeEdgeFillerFunction(const msgs::ExpandOneRequest &req) {
|
||||
return edge_filler;
|
||||
}
|
||||
|
||||
msgs::ReadResponses HandleReadWithOrderBy(msgs::ExpandOneRequest &&req, Shard::Accessor &&acc) {
|
||||
bool action_successful = true;
|
||||
|
||||
std::vector<msgs::ExpandOneResultRow> results;
|
||||
auto batch_limit = req.limit;
|
||||
auto dba = DbAccessor{&acc};
|
||||
|
||||
auto maybe_filter_based_on_edge_uniquness = InitializeEdgeUniqunessFunction(req.only_unique_neighbor_rows);
|
||||
auto edge_filler = InitializeEdgeFillerFunction(req);
|
||||
|
||||
auto vertex_iterable = acc.Vertices(View::OLD);
|
||||
const auto ordered_vertices = OrderByElements<VertexAccessor>(acc, dba, vertex_iterable, req.order_by);
|
||||
|
||||
for (const auto &ordered_vertice : ordered_vertices) {
|
||||
// Get Vertex acc
|
||||
auto src_vertex_acc = ordered_vertice.object_acc;
|
||||
|
||||
if (!req.filters.empty()) {
|
||||
// NOTE - DbAccessor might get removed in the future.
|
||||
const bool eval = FilterOnVertex(dba, src_vertex_acc, req.filters, expr::identifier_node_symbol);
|
||||
if (!eval) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
auto [in_edge_accessors, out_edge_accessors] = GetEdgesFromVertex(src_vertex_acc, req.direction);
|
||||
const auto in_ordered_edges = OrderByElements<EdgeAccessor>(acc, dba, in_edge_accessors, req.order_by);
|
||||
const auto out_ordered_edges = OrderByElements<EdgeAccessor>(acc, dba, out_edge_accessors, req.order_by);
|
||||
|
||||
std::vector<EdgeAccessor> in_edge_ordered_accessors;
|
||||
std::transform(in_ordered_edges.begin(), in_ordered_edges.end(), std::back_inserter(in_edge_ordered_accessors),
|
||||
[](auto &edge_element) { return edge_element.object_acc; });
|
||||
|
||||
std::vector<EdgeAccessor> out_edge_ordered_accessors;
|
||||
std::transform(out_ordered_edges.begin(), out_ordered_edges.end(), std::back_inserter(out_edge_ordered_accessors),
|
||||
[](auto &edge_element) { return edge_element.object_acc; });
|
||||
|
||||
auto label_id = src_vertex_acc.PrimaryLabel(View::NEW);
|
||||
if (label_id.HasError()) {
|
||||
action_successful = false;
|
||||
break;
|
||||
}
|
||||
auto primary_key = src_vertex_acc.PrimaryKey(View::NEW);
|
||||
if (primary_key.HasError()) {
|
||||
action_successful = false;
|
||||
break;
|
||||
}
|
||||
|
||||
msgs::VertexId src_vertice(msgs::Label{.id = *label_id}, conversions::ConvertValueVector(*primary_key));
|
||||
auto maybe_result =
|
||||
GetExpandOneResult(src_vertex_acc, src_vertice, req, in_edge_ordered_accessors, out_edge_ordered_accessors,
|
||||
maybe_filter_based_on_edge_uniquness, edge_filler);
|
||||
|
||||
if (!maybe_result) {
|
||||
action_successful = false;
|
||||
break;
|
||||
}
|
||||
|
||||
results.emplace_back(maybe_result.value());
|
||||
if (batch_limit.has_value() && results.size() >= batch_limit.value()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
msgs::ExpandOneResponse resp{};
|
||||
resp.success = action_successful;
|
||||
if (action_successful) {
|
||||
resp.result = std::move(results);
|
||||
}
|
||||
|
||||
return resp;
|
||||
}
|
||||
|
||||
msgs::ReadResponses HandleReadWithoutOrderBy(msgs::ExpandOneRequest &&req, Shard::Accessor &&acc) {
|
||||
bool action_successful = true;
|
||||
|
||||
std::vector<msgs::ExpandOneResultRow> results;
|
||||
auto batch_limit = req.limit;
|
||||
auto dba = DbAccessor{&acc};
|
||||
|
||||
auto maybe_filter_based_on_edge_uniquness = InitializeEdgeUniqunessFunction(req.only_unique_neighbor_rows);
|
||||
auto edge_filler = InitializeEdgeFillerFunction(req);
|
||||
|
||||
for (auto &src_vertex : req.src_vertices) {
|
||||
// Get Vertex acc
|
||||
auto src_vertex_acc_opt = acc.FindVertex(ConvertPropertyVector((src_vertex.second)), View::NEW);
|
||||
if (!src_vertex_acc_opt) {
|
||||
action_successful = false;
|
||||
spdlog::debug("Encountered an error while trying to obtain VertexAccessor. Transaction id: {}",
|
||||
req.transaction_id.logical_id);
|
||||
break;
|
||||
}
|
||||
|
||||
if (!req.filters.empty()) {
|
||||
// NOTE - DbAccessor might get removed in the future.
|
||||
auto dba = DbAccessor{&acc};
|
||||
const bool eval = FilterOnVertex(dba, src_vertex_acc_opt.value(), req.filters, expr::identifier_node_symbol);
|
||||
if (!eval) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
auto result = GetExpandOneResult(acc, src_vertex, req, maybe_filter_based_on_edge_uniquness, edge_filler);
|
||||
|
||||
if (!result) {
|
||||
action_successful = false;
|
||||
break;
|
||||
}
|
||||
|
||||
results.emplace_back(result.value());
|
||||
if (batch_limit.has_value() && results.size() >= batch_limit.value()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
msgs::ExpandOneResponse resp{};
|
||||
resp.success = action_successful;
|
||||
if (action_successful) {
|
||||
resp.result = std::move(results);
|
||||
}
|
||||
|
||||
return resp;
|
||||
}
|
||||
|
||||
}; // namespace
|
||||
msgs::WriteResponses ShardRsm::ApplyWrite(msgs::CreateVerticesRequest &&req) {
|
||||
auto acc = shard_->Access(req.transaction_id);
|
||||
@ -887,113 +1010,11 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::ScanVerticesRequest &&req) {
|
||||
}
|
||||
|
||||
msgs::ReadResponses ShardRsm::HandleRead(msgs::ExpandOneRequest &&req) {
|
||||
auto acc = shard_->Access(req.transaction_id);
|
||||
bool action_successful = true;
|
||||
|
||||
std::vector<msgs::ExpandOneResultRow> results;
|
||||
auto batch_limit = req.limit;
|
||||
auto dba = DbAccessor{&acc};
|
||||
|
||||
auto maybe_filter_based_on_edge_uniquness = InitializeEdgeUniqunessFunction(req.only_unique_neighbor_rows);
|
||||
auto edge_filler = InitializeEdgeFillerFunction(req);
|
||||
|
||||
//#NoCommit code below is duplicated, one needs to factor it once it's clear!
|
||||
if (!req.order_by.empty()) {
|
||||
auto vertex_iterable = acc.Vertices(View::OLD);
|
||||
const auto ordered_vertices = OrderByElements<VertexAccessor>(acc, dba, vertex_iterable, req.order_by);
|
||||
|
||||
for (auto &ordered_vertice : ordered_vertices) {
|
||||
// Get Vertex acc
|
||||
auto src_vertex_acc = ordered_vertice.object_acc;
|
||||
|
||||
if (!req.filters.empty()) {
|
||||
// NOTE - DbAccessor might get removed in the future.
|
||||
const bool eval = FilterOnVertex(dba, src_vertex_acc, req.filters, expr::identifier_node_symbol);
|
||||
if (!eval) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
auto [in_edge_accessors, out_edge_accessors] = GetEdgesFromVertex(src_vertex_acc, req.direction);
|
||||
const auto in_ordered_edges = OrderByElements<EdgeAccessor>(acc, dba, in_edge_accessors, req.order_by);
|
||||
const auto out_ordered_edges = OrderByElements<EdgeAccessor>(acc, dba, out_edge_accessors, req.order_by);
|
||||
|
||||
std::vector<EdgeAccessor> in_edge_ordered_accessors;
|
||||
std::transform(in_ordered_edges.begin(), in_ordered_edges.end(), std::back_inserter(in_edge_ordered_accessors),
|
||||
[](auto &edge_element) { return edge_element.object_acc; });
|
||||
|
||||
std::vector<EdgeAccessor> out_edge_ordered_accessors;
|
||||
std::transform(out_ordered_edges.begin(), out_ordered_edges.end(), std::back_inserter(out_edge_ordered_accessors),
|
||||
[](auto &edge_element) { return edge_element.object_acc; });
|
||||
|
||||
auto label_id = src_vertex_acc.PrimaryLabel(View::NEW);
|
||||
if (label_id.HasError()) {
|
||||
action_successful = false;
|
||||
break;
|
||||
}
|
||||
auto primary_key = src_vertex_acc.PrimaryKey(View::NEW);
|
||||
if (primary_key.HasError()) {
|
||||
action_successful = false;
|
||||
break;
|
||||
}
|
||||
|
||||
msgs::VertexId src_vertice;
|
||||
src_vertice.first = msgs::Label{.id = *label_id};
|
||||
src_vertice.second = conversions::ConvertValueVector(*primary_key);
|
||||
auto maybe_result =
|
||||
GetExpandOneResult2(src_vertex_acc, src_vertice, req, in_edge_ordered_accessors, out_edge_ordered_accessors,
|
||||
maybe_filter_based_on_edge_uniquness, edge_filler);
|
||||
|
||||
if (!maybe_result) {
|
||||
action_successful = false;
|
||||
break;
|
||||
}
|
||||
|
||||
results.emplace_back(maybe_result.value());
|
||||
if (batch_limit.has_value() && results.size() >= batch_limit.value()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return HandleReadWithOrderBy(std::move(req), shard_->Access(req.transaction_id));
|
||||
} else {
|
||||
for (auto &src_vertex : req.src_vertices) {
|
||||
// Get Vertex acc
|
||||
auto src_vertex_acc_opt = acc.FindVertex(ConvertPropertyVector((src_vertex.second)), View::NEW);
|
||||
if (!src_vertex_acc_opt) {
|
||||
action_successful = false;
|
||||
spdlog::debug("Encountered an error while trying to obtain VertexAccessor. Transaction id: {}",
|
||||
req.transaction_id.logical_id);
|
||||
break;
|
||||
}
|
||||
|
||||
if (!req.filters.empty()) {
|
||||
// NOTE - DbAccessor might get removed in the future.
|
||||
auto dba = DbAccessor{&acc};
|
||||
const bool eval = FilterOnVertex(dba, src_vertex_acc_opt.value(), req.filters, expr::identifier_node_symbol);
|
||||
if (!eval) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
auto result = GetExpandOneResult(acc, src_vertex, req, maybe_filter_based_on_edge_uniquness, edge_filler);
|
||||
|
||||
if (!result) {
|
||||
action_successful = false;
|
||||
break;
|
||||
}
|
||||
|
||||
results.emplace_back(result.value());
|
||||
if (batch_limit.has_value() && results.size() >= batch_limit.value()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return HandleReadWithoutOrderBy(std::move(req), shard_->Access(req.transaction_id));
|
||||
}
|
||||
|
||||
msgs::ExpandOneResponse resp{};
|
||||
resp.success = action_successful;
|
||||
if (action_successful) {
|
||||
resp.result = std::move(results);
|
||||
}
|
||||
|
||||
return resp;
|
||||
}
|
||||
|
||||
msgs::WriteResponses ShardRsm::ApplyWrite(msgs::CommitRequest &&req) {
|
||||
|
Loading…
Reference in New Issue
Block a user