Add limit to ExpandOne

Add missing pragma
Add test
Merge conflicts
This commit is contained in:
jeremy 2022-10-21 16:32:49 +02:00
parent 1c3bb969e9
commit 55e0dbca80
5 changed files with 248 additions and 72 deletions

View File

@ -9,6 +9,8 @@
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <vector>
#include "db_accessor.hpp"

View File

@ -19,43 +19,6 @@
namespace memgraph::storage::v3 {
std::vector<Element> OrderByElements(Shard::Accessor &acc, DbAccessor &dba, VerticesIterable &vertices_iterable,
std::vector<msgs::OrderBy> &order_bys) {
std::vector<Element> ordered;
ordered.reserve(acc.ApproximateVertexCount());
std::vector<Ordering> ordering;
ordering.reserve(order_bys.size());
for (const auto &order : order_bys) {
switch (order.direction) {
case memgraph::msgs::OrderingDirection::ASCENDING: {
ordering.push_back(Ordering::ASC);
break;
}
case memgraph::msgs::OrderingDirection::DESCENDING: {
ordering.push_back(Ordering::DESC);
break;
}
}
}
auto compare_typed_values = TypedValueVectorCompare(ordering);
for (auto it = vertices_iterable.begin(); it != vertices_iterable.end(); ++it) {
std::vector<TypedValue> properties_order_by;
properties_order_by.reserve(order_bys.size());
for (const auto &order_by : order_bys) {
const auto val =
ComputeExpression(dba, *it, std::nullopt, order_by.expression.expression, expr::identifier_node_symbol, "");
properties_order_by.push_back(val);
}
ordered.push_back({std::move(properties_order_by), *it});
}
std::sort(ordered.begin(), ordered.end(), [compare_typed_values](const auto &pair1, const auto &pair2) {
return compare_typed_values(pair1.properties_order_by, pair2.properties_order_by);
});
return ordered;
}
VerticesIterable::Iterator GetStartVertexIterator(VerticesIterable &vertex_iterable,
const std::vector<PropertyValue> &start_ids, const View view) {
auto it = vertex_iterable.begin();
@ -68,15 +31,47 @@ VerticesIterable::Iterator GetStartVertexIterator(VerticesIterable &vertex_itera
return it;
}
std::vector<Element>::const_iterator GetStartOrderedElementsIterator(const std::vector<Element> &ordered_elements,
const std::vector<PropertyValue> &start_ids,
const View view) {
std::vector<Element<VertexAccessor>>::const_iterator GetStartOrderedElementsIterator(
const std::vector<Element<VertexAccessor>> &ordered_elements, const std::vector<PropertyValue> &start_ids,
const View view) {
for (auto it = ordered_elements.begin(); it != ordered_elements.end(); ++it) {
if (const auto &vertex = it->vertex_acc; start_ids <= vertex.PrimaryKey(view).GetValue()) {
if (const auto &vertex = it->object_acc; start_ids <= vertex.PrimaryKey(view).GetValue()) {
return it;
}
}
return ordered_elements.end();
}
std::vector<EdgeAccessor> GetEdgesFromVertex(const VertexAccessor &vertex_accessor,
const msgs::EdgeDirection direction) {
switch (direction) {
case memgraph::msgs::EdgeDirection::IN: {
auto edges = vertex_accessor.InEdges(View::OLD);
if (edges.HasValue()) {
return edges.GetValue();
}
return {};
}
case memgraph::msgs::EdgeDirection::OUT: {
auto edges = vertex_accessor.OutEdges(View::OLD);
if (edges.HasValue()) {
return edges.GetValue();
}
return {};
}
case memgraph::msgs::EdgeDirection::BOTH: {
auto maybe_in_edges = vertex_accessor.InEdges(View::OLD);
auto maybe_out_edges = vertex_accessor.OutEdges(View::OLD);
std::vector<EdgeAccessor> edges;
if (maybe_in_edges.HasValue()) {
edges = maybe_in_edges.GetValue();
}
if (maybe_out_edges.HasValue()) {
edges.insert(edges.end(), maybe_out_edges.GetValue().begin(), maybe_out_edges.GetValue().end());
}
return edges;
}
}
}
} // namespace memgraph::storage::v3

View File

@ -9,14 +9,21 @@
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <vector>
#include "ast/ast.hpp"
#include "pretty_print_ast_to_original_expression.hpp" // #NoCommit why like this?
#include "query/v2/requests.hpp"
#include "storage/v3/bindings/typed_value.hpp"
#include "storage/v3/edge_accessor.hpp"
#include "storage/v3/expr.hpp"
#include "storage/v3/shard.hpp"
#include "storage/v3/vertex_accessor.hpp"
namespace memgraph::storage::v3 {
template <typename T>
concept ObjectAccessor = std::is_same_v<T, VertexAccessor> || std::is_same_v<T, EdgeAccessor>;
inline bool TypedValueCompare(const TypedValue &a, const TypedValue &b) {
// in ordering null comes after everything else
@ -99,18 +106,62 @@ class TypedValueVectorCompare final {
std::vector<Ordering> ordering_;
};
template <ObjectAccessor TObjectAccessor>
struct Element {
std::vector<TypedValue> properties_order_by;
VertexAccessor vertex_acc;
TObjectAccessor object_acc;
};
std::vector<Element> OrderByElements(Shard::Accessor &acc, DbAccessor &dba, VerticesIterable &vertices_iterable,
std::vector<msgs::OrderBy> &order_bys);
// #NoCommit in cpp
template <ObjectAccessor TObjectAccessor, typename TIterable>
std::vector<Element<TObjectAccessor>> OrderByElements(Shard::Accessor &acc, DbAccessor &dba, TIterable &iterable,
std::vector<msgs::OrderBy> &order_bys) {
std::vector<Element<TObjectAccessor>> ordered;
ordered.reserve(acc.ApproximateVertexCount());
std::vector<Ordering> ordering;
ordering.reserve(order_bys.size());
for (const auto &order : order_bys) {
switch (order.direction) {
case memgraph::msgs::OrderingDirection::ASCENDING: {
ordering.push_back(Ordering::ASC);
break;
}
case memgraph::msgs::OrderingDirection::DESCENDING: {
ordering.push_back(Ordering::DESC);
break;
}
}
}
auto compare_typed_values = TypedValueVectorCompare(ordering);
auto it = iterable.begin();
for (; it != iterable.end(); ++it) {
std::vector<TypedValue> properties_order_by;
properties_order_by.reserve(order_bys.size());
for (const auto &order_by : order_bys) {
if constexpr (std::is_same_v<TIterable, VerticesIterable>) {
properties_order_by.push_back(ComputeExpression(dba, *it, std::nullopt, order_by.expression.expression,
expr::identifier_node_symbol, expr::identifier_edge_symbol));
} else {
properties_order_by.push_back(ComputeExpression(dba, std::nullopt, *it, order_by.expression.expression,
expr::identifier_node_symbol, expr::identifier_edge_symbol));
}
}
ordered.push_back({std::move(properties_order_by), *it});
}
std::sort(ordered.begin(), ordered.end(), [compare_typed_values](const auto &pair1, const auto &pair2) {
return compare_typed_values(pair1.properties_order_by, pair2.properties_order_by);
});
return ordered;
}
VerticesIterable::Iterator GetStartVertexIterator(VerticesIterable &vertex_iterable,
const std::vector<PropertyValue> &start_ids, View view);
std::vector<Element>::const_iterator GetStartOrderedElementsIterator(const std::vector<Element> &ordered_elements,
const std::vector<PropertyValue> &start_ids,
View view);
std::vector<Element<VertexAccessor>>::const_iterator GetStartOrderedElementsIterator(
const std::vector<Element<VertexAccessor>> &ordered_elements, const std::vector<PropertyValue> &start_ids,
View view);
std::vector<EdgeAccessor> GetEdgesFromVertex(const VertexAccessor &vertex_accessor, msgs::EdgeDirection direction);
} // namespace memgraph::storage::v3

View File

@ -15,6 +15,7 @@
#include <unordered_set>
#include <utility>
#include <unordered_map>
#include "parser/opencypher/parser.hpp"
#include "query/v2/requests.hpp"
#include "storage/v3/bindings/ast/ast.hpp"
@ -26,6 +27,7 @@
#include "storage/v3/bindings/symbol_generator.hpp"
#include "storage/v3/bindings/symbol_table.hpp"
#include "storage/v3/bindings/typed_value.hpp"
#include "storage/v3/edge_accessor.hpp"
#include "storage/v3/expr.hpp"
#include "storage/v3/id_types.hpp"
#include "storage/v3/key_store.hpp"
@ -802,18 +804,18 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::ScanVerticesRequest &&req) {
uint64_t sample_counter{0};
auto vertex_iterable = acc.Vertices(view);
if (!req.order_bys.empty()) {
const auto ordered = OrderByElements(acc, dba, vertex_iterable, req.order_bys);
const auto ordered = OrderByElements<VertexAccessor>(acc, dba, vertex_iterable, req.order_bys);
// we are traversing Elements
auto it = GetStartOrderedElementsIterator(ordered, start_id, View(req.storage_view));
for (; it != ordered.end(); ++it) {
emplace_scan_result(it->vertex_acc);
emplace_scan_result(it->object_acc);
++sample_counter;
if (req.batch_limit && sample_counter == req.batch_limit) {
// Reached the maximum specified batch size.
// Get the next element before exiting.
++it;
if (it != ordered.end()) {
const auto &next_vertex = it->vertex_acc;
const auto &next_vertex = it->object_acc;
next_start_id = ConstructValueVertex(next_vertex, view).vertex_v.id;
}
@ -854,36 +856,97 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::ExpandOneRequest &&req) {
bool action_successful = true;
std::vector<msgs::ExpandOneResultRow> results;
auto batch_limit = req.limit;
auto dba = DbAccessor{&acc};
auto maybe_filter_based_on_edge_uniquness = InitializeEdgeUniqunessFunction(req.only_unique_neighbor_rows);
auto edge_filler = InitializeEdgeFillerFunction(req);
for (auto &src_vertex : req.src_vertices) {
// Get Vertex acc
auto src_vertex_acc_opt = acc.FindVertex(ConvertPropertyVector((src_vertex.second)), View::NEW);
if (!src_vertex_acc_opt) {
action_successful = false;
spdlog::debug("Encountered an error while trying to obtain VertexAccessor. Transaction id: {}",
req.transaction_id.logical_id);
break;
}
//#NoCommit code below is duplicated, one needs to factor it once it's clear
if (req.order_by) {
auto vertex_iterable = acc.Vertices(View::OLD);
const auto ordered_vertices = OrderByElements<VertexAccessor>(acc, dba, vertex_iterable, *req.order_by);
std::vector<std::pair<VertexAccessor, std::vector<Element<EdgeAccessor>>>> vertex_ordered_edges;
vertex_ordered_edges.reserve(req.src_vertices.size());
if (!req.filters.empty()) {
// NOTE - DbAccessor might get removed in the future.
auto dba = DbAccessor{&acc};
const bool eval = FilterOnVertex(dba, src_vertex_acc_opt.value(), req.filters, expr::identifier_node_symbol);
if (!eval) {
continue;
for (auto &src_vertex : req.src_vertices) {
// Get Vertex acc
auto src_vertex_acc_opt = acc.FindVertex(ConvertPropertyVector((src_vertex.second)), View::NEW);
if (!src_vertex_acc_opt) {
action_successful = false;
spdlog::debug("Encountered an error while trying to obtain VertexAccessor. Transaction id: {}",
req.transaction_id.logical_id);
break;
}
if (!req.filters.empty()) {
// NOTE - DbAccessor might get removed in the future.
auto dba = DbAccessor{&acc};
const bool eval = FilterOnVertex(dba, src_vertex_acc_opt.value(), req.filters, expr::identifier_node_symbol);
if (!eval) {
continue;
}
}
auto result = GetExpandOneResult(acc, src_vertex, req, maybe_filter_based_on_edge_uniquness, edge_filler);
if (!result) {
action_successful = false;
break;
}
results.emplace_back(result.value());
if (batch_limit) { // #NoCommit can ebd one differently
--*batch_limit;
if (batch_limit < 0) {
break;
}
}
}
auto result = GetExpandOneResult(acc, src_vertex, req, maybe_filter_based_on_edge_uniquness, edge_filler);
// Now we have to construct response like this, and here we will care about
// limit:
// v1 e1
// v1 e2
// v1 e3
// v2 e4
// v2 e5
// v2 e6
// v2 e7
// v3 e8
// v3 e9
} else {
for (auto &src_vertex : req.src_vertices) {
// Get Vertex acc
auto src_vertex_acc_opt = acc.FindVertex(ConvertPropertyVector((src_vertex.second)), View::NEW);
if (!src_vertex_acc_opt) {
action_successful = false;
spdlog::debug("Encountered an error while trying to obtain VertexAccessor. Transaction id: {}",
req.transaction_id.logical_id);
break;
}
if (!result) {
action_successful = false;
break;
if (!req.filters.empty()) {
// NOTE - DbAccessor might get removed in the future.
auto dba = DbAccessor{&acc};
const bool eval = FilterOnVertex(dba, src_vertex_acc_opt.value(), req.filters, expr::identifier_node_symbol);
if (!eval) {
continue;
}
}
auto result = GetExpandOneResult(acc, src_vertex, req, maybe_filter_based_on_edge_uniquness, edge_filler);
if (!result) {
action_successful = false;
break;
}
results.emplace_back(result.value());
if (batch_limit) { // #NoCommit can ebd one differently
--*batch_limit;
if (batch_limit < 0) {
break;
}
}
}
results.emplace_back(result.value());
}
msgs::ExpandOneResponse resp{};

View File

@ -680,6 +680,61 @@ void AttemptToExpandOneWithUniqueEdges(ShardClient &client, uint64_t src_vertex_
}
}
void AttemptToExpandOneLimitAndOrderBy(ShardClient &client, uint64_t src_vertex_val, EdgeTypeId edge_type_id) {
// Source vertex
msgs::Label label = {.id = get_primary_label()};
auto src_vertex = std::make_pair(label, GetPrimaryKey(src_vertex_val));
// Edge type
auto edge_type = msgs::EdgeType{};
edge_type.id = edge_type_id;
// Edge direction
auto edge_direction = msgs::EdgeDirection::OUT;
// Source Vertex properties to look for
std::optional<std::vector<PropertyId>> src_vertex_properties = {};
// Edge properties to look for
std::optional<std::vector<PropertyId>> edge_properties = {};
std::vector<std::string> expressions;
std::optional<std::vector<msgs::OrderBy>> order_by = {};
// std::optional<size_t> limit = 5;
// std::optional<msgs::Filter> filter = {};
msgs::ExpandOneRequest expand_one_req{};
expand_one_req.direction = edge_direction;
expand_one_req.edge_properties = edge_properties;
expand_one_req.edge_types = {edge_type};
// expand_one_req.expressions = expressions; #NoCommit not existing?
// expand_one_req.filter = filter;
// expand_one_req.limit = limit;
expand_one_req.order_by = order_by;
expand_one_req.src_vertex_properties = src_vertex_properties;
expand_one_req.src_vertices = {src_vertex};
expand_one_req.transaction_id.logical_id = GetTransactionId();
while (true) {
auto read_res = client.SendReadRequest(expand_one_req);
if (read_res.HasError()) {
continue;
}
auto write_response_result = read_res.GetValue();
auto write_response = std::get<msgs::ExpandOneResponse>(write_response_result);
MG_ASSERT(write_response.result.size() == 1);
// MG_ASSERT(write_response.result[0].edges_with_all_properties->size() == 10); // #NoCommit where does that come
// from?
// auto number_of_properties_on_edge =
// (std::get<std::map<PropertyId, msgs::Value>>(write_response.result[0].edges_with_all_properties.value()[0]))
// .size();
// MG_ASSERT(number_of_properties_on_edge == 1);
break;
}
}
void AttemptToExpandOneWithSpecifiedSrcVertexProperties(ShardClient &client, uint64_t src_vertex_val,
EdgeTypeId edge_type_id) {
// Source vertex
@ -1021,6 +1076,9 @@ void TestExpandOneGraphOne(ShardClient &client) {
auto edge_prop_id = GetUniqueInteger();
auto edge_prop_val = GetUniqueInteger();
std::vector<uint64_t> edges_ids(10);
std::generate(edges_ids.begin(), edges_ids.end(), GetUniqueInteger);
// (V1)-[edge_type_id]->(V2)
MG_ASSERT(AttemptToAddEdgeWithProperties(client, unique_prop_val_1, unique_prop_val_2, edge_gid_1, edge_prop_id,
edge_prop_val, {edge_type_id}));
@ -1028,6 +1086,13 @@ void TestExpandOneGraphOne(ShardClient &client) {
MG_ASSERT(AttemptToAddEdgeWithProperties(client, unique_prop_val_1, unique_prop_val_3, edge_gid_2, edge_prop_id,
edge_prop_val, {edge_type_id}));
// (V2)-[edge_type_id]->(V3) x 10
std::for_each(edges_ids.begin(), edges_ids.end(), [&](const auto &edge_id) {
MG_ASSERT(AttemptToAddEdgeWithProperties(client, unique_prop_val_2, unique_prop_val_3, edge_id, edge_prop_id,
edge_prop_val, {edge_type_id}));
});
AttemptToExpandOneLimitAndOrderBy(client, unique_prop_val_2, edge_type_id); // #NoCommit
AttemptToExpandOneSimple(client, unique_prop_val_1, edge_type_id);
AttemptToExpandOneWithWrongEdgeType(client, unique_prop_val_1, wrong_edge_type_id);
AttemptToExpandOneWithSpecifiedSrcVertexProperties(client, unique_prop_val_1, edge_type_id);