From 9ec72bd96985bc3ecea5031f18dd90292daa5bc3 Mon Sep 17 00:00:00 2001 From: Kostas Kyrimis <kostaskyrim@gmail.com> Date: Wed, 16 Nov 2022 18:41:22 +0200 Subject: [PATCH 1/9] Add GetProperties shard handler and tests --- src/query/v2/requests.hpp | 40 +++-- src/storage/v3/request_helper.cpp | 62 +++++++- src/storage/v3/request_helper.hpp | 11 ++ src/storage/v3/shard_rsm.cpp | 138 ++++++++++++++++- tests/simulation/shard_rsm.cpp | 248 +++++++++++++++++++++++++++++- 5 files changed, 474 insertions(+), 25 deletions(-) diff --git a/src/query/v2/requests.hpp b/src/query/v2/requests.hpp index 2ea4efb57..4c9280848 100644 --- a/src/query/v2/requests.hpp +++ b/src/query/v2/requests.hpp @@ -321,10 +321,6 @@ struct Expression { std::string expression; }; -struct Filter { - std::string filter_expression; -}; - enum class OrderingDirection { ASCENDING = 1, DESCENDING = 2 }; struct OrderBy { @@ -366,22 +362,42 @@ struct ScanVerticesResponse { std::vector<ScanResultRow> results; }; -using VertexOrEdgeIds = std::variant<VertexId, EdgeId>; +struct VertexAndEdgeId { + VertexId vertex; + std::optional<EdgeTypeId> edge; +}; struct GetPropertiesRequest { Hlc transaction_id; - // Shouldn't contain mixed vertex and edge ids - VertexOrEdgeIds vertex_or_edge_ids; + std::vector<VertexAndEdgeId> vertices_and_edges; + std::vector<PropertyId> property_ids; - std::vector<Expression> expressions; - bool only_unique = false; - std::optional<std::vector<OrderBy>> order_by; + std::vector<std::string> expressions; + + std::vector<OrderBy> order_by; std::optional<size_t> limit; - std::optional<Filter> filter; + + // Return only the properties of the vertices or edges that the filter predicate + // evaluates to true + std::optional<std::string> filter; +}; + +struct PropIdValue { + std::vector<PropertyId> ids; + std::vector<Value> properties; +}; + +struct GetPropertiesResultRow { + VertexAndEdgeId vertex_and_edge; + + PropIdValue properies_and_ids; + std::vector<Value> evaluated_expressions; }; struct GetPropertiesResponse { - bool success; + std::vector<GetPropertiesResultRow> result_row; + enum RequestResult : uint16_t { OUT_OF_SHARD_RANGE, SUCCESS, FAILURE }; + RequestResult result; }; enum class EdgeDirection : uint8_t { OUT = 1, IN = 2, BOTH = 3 }; diff --git a/src/storage/v3/request_helper.cpp b/src/storage/v3/request_helper.cpp index bb1c8bca4..96170dccf 100644 --- a/src/storage/v3/request_helper.cpp +++ b/src/storage/v3/request_helper.cpp @@ -11,6 +11,7 @@ #include "storage/v3/request_helper.hpp" +#include <iterator> #include <vector> #include "pretty_print_ast_to_original_expression.hpp" @@ -43,19 +44,74 @@ std::vector<Element> OrderByElements(Shard::Accessor &acc, DbAccessor &dba, Vert properties_order_by.reserve(order_bys.size()); for (const auto &order_by : order_bys) { - const auto val = + auto val = ComputeExpression(dba, *it, std::nullopt, order_by.expression.expression, expr::identifier_node_symbol, ""); - properties_order_by.push_back(val); + properties_order_by.push_back(std::move(val)); } ordered.push_back({std::move(properties_order_by), *it}); } - std::sort(ordered.begin(), ordered.end(), [compare_typed_values](const auto &pair1, const auto &pair2) { + std::sort(ordered.begin(), ordered.end(), [&compare_typed_values](const auto &pair1, const auto &pair2) { return compare_typed_values(pair1.properties_order_by, pair2.properties_order_by); }); return ordered; } +std::vector<GetPropElement> OrderByElements(DbAccessor &dba, std::vector<msgs::OrderBy> &order_by, + std::vector<GetPropElement> &&vertices) { + std::vector<Ordering> ordering; + ordering.reserve(order_by.size()); + for (const auto &order : order_by) { + switch (order.direction) { + case memgraph::msgs::OrderingDirection::ASCENDING: { + ordering.push_back(Ordering::ASC); + break; + } + case memgraph::msgs::OrderingDirection::DESCENDING: { + ordering.push_back(Ordering::DESC); + break; + } + } + } + struct PropElement { + std::vector<TypedValue> properties_order_by; + VertexAccessor vertex_acc; + GetPropElement *original_element; + }; + + std::vector<PropElement> ordered; + auto compare_typed_values = TypedValueVectorCompare(ordering); + for (auto &vertex : vertices) { + std::vector<TypedValue> properties; + properties.reserve(order_by.size()); + const auto *symbol = (vertex.edge_acc) ? expr::identifier_edge_symbol : expr::identifier_node_symbol; + for (const auto &order : order_by) { + TypedValue val; + if (vertex.edge_acc) { + val = ComputeExpression(dba, vertex.vertex_acc, vertex.edge_acc, order.expression.expression, "", symbol); + } else { + val = ComputeExpression(dba, vertex.vertex_acc, vertex.edge_acc, order.expression.expression, symbol, ""); + } + properties.push_back(std::move(val)); + } + + ordered.push_back({std::move(properties), vertex.vertex_acc, &vertex}); + } + + std::sort(ordered.begin(), ordered.end(), [&compare_typed_values](const auto &lhs, const auto &rhs) { + return compare_typed_values(lhs.properties_order_by, rhs.properties_order_by); + }); + + std::vector<GetPropElement> results_ordered; + results_ordered.reserve(ordered.size()); + + for (auto &elem : ordered) { + results_ordered.push_back(std::move(*elem.original_element)); + } + + return results_ordered; +} + VerticesIterable::Iterator GetStartVertexIterator(VerticesIterable &vertex_iterable, const std::vector<PropertyValue> &start_ids, const View view) { auto it = vertex_iterable.begin(); diff --git a/src/storage/v3/request_helper.hpp b/src/storage/v3/request_helper.hpp index 24ed40f8c..f98f18cd3 100644 --- a/src/storage/v3/request_helper.hpp +++ b/src/storage/v3/request_helper.hpp @@ -12,6 +12,7 @@ #include <vector> #include "ast/ast.hpp" +#include "query/v2/requests.hpp" #include "storage/v3/bindings/typed_value.hpp" #include "storage/v3/shard.hpp" #include "storage/v3/vertex_accessor.hpp" @@ -104,9 +105,19 @@ struct Element { VertexAccessor vertex_acc; }; +struct GetPropElement { + std::vector<TypedValue> properties_order_by; + std::vector<PropertyId> ids; + VertexAccessor vertex_acc; + std::optional<EdgeAccessor> edge_acc; +}; + std::vector<Element> OrderByElements(Shard::Accessor &acc, DbAccessor &dba, VerticesIterable &vertices_iterable, std::vector<msgs::OrderBy> &order_bys); +std::vector<GetPropElement> OrderByElements(DbAccessor &dba, std::vector<msgs::OrderBy> &order_bys, + std::vector<GetPropElement> &&vertices); + VerticesIterable::Iterator GetStartVertexIterator(VerticesIterable &vertex_iterable, const std::vector<PropertyValue> &start_ids, View view); diff --git a/src/storage/v3/shard_rsm.cpp b/src/storage/v3/shard_rsm.cpp index f288e5e27..4036bbd80 100644 --- a/src/storage/v3/shard_rsm.cpp +++ b/src/storage/v3/shard_rsm.cpp @@ -15,8 +15,10 @@ #include <optional> #include <unordered_set> #include <utility> +#include <variant> #include "parser/opencypher/parser.hpp" +#include "pretty_print_ast_to_original_expression.hpp" #include "query/v2/requests.hpp" #include "storage/v2/vertex.hpp" #include "storage/v2/view.hpp" @@ -29,6 +31,7 @@ #include "storage/v3/bindings/symbol_generator.hpp" #include "storage/v3/bindings/symbol_table.hpp" #include "storage/v3/bindings/typed_value.hpp" +#include "storage/v3/conversions.hpp" #include "storage/v3/expr.hpp" #include "storage/v3/id_types.hpp" #include "storage/v3/key_store.hpp" @@ -159,10 +162,15 @@ std::optional<std::map<PropertyId, Value>> CollectAllPropertiesFromAccessor(cons } bool FilterOnVertex(DbAccessor &dba, const storage::v3::VertexAccessor &v_acc, const std::vector<std::string> &filters, - const std::string_view node_name) { - return std::ranges::all_of(filters, [&node_name, &dba, &v_acc](const auto &filter_expr) { - auto res = ComputeExpression(dba, v_acc, std::nullopt, filter_expr, node_name, ""); - return res.IsBool() && res.ValueBool(); + const std::string_view node_name, const std::optional<EdgeAccessor> &e_acc = std::nullopt) { + return std::ranges::all_of(filters, [&node_name, &dba, &v_acc, &e_acc](const auto &filter_expr) { + TypedValue result; + if (e_acc) { + result = ComputeExpression(dba, v_acc, e_acc, filter_expr, "", node_name); + } else { + result = ComputeExpression(dba, v_acc, e_acc, filter_expr, node_name, ""); + } + return result.IsBool() && result.ValueBool(); }); } @@ -936,9 +944,125 @@ msgs::WriteResponses ShardRsm::ApplyWrite(msgs::CommitRequest &&req) { return msgs::CommitResponse{true}; }; -// NOLINTNEXTLINE(readability-convert-member-functions-to-static) -msgs::ReadResponses ShardRsm::HandleRead(msgs::GetPropertiesRequest && /*req*/) { - return msgs::GetPropertiesResponse{}; +msgs::ReadResponses ShardRsm::HandleRead(msgs::GetPropertiesRequest &&req) { + if (req.vertices_and_edges.empty()) { + return msgs::GetPropertiesResponse{.result = msgs::GetPropertiesResponse::FAILURE}; + } + + auto shard_acc = shard_->Access(req.transaction_id); + auto dba = DbAccessor{&shard_acc}; + const auto view = storage::v3::View::NEW; + + auto collect_props = [](const VertexAccessor &acc, const std::vector<PropertyId> &props, View view, + const std::optional<EdgeAccessor> &e_acc) mutable -> std::optional<GetPropElement> { + std::vector<TypedValue> properties; + std::vector<PropertyId> ids; + for (const auto &prop : props) { + Result<PropertyValue> result{PropertyValue()}; + if (e_acc) { + result = e_acc->GetProperty(prop, view); + } else { + result = acc.GetProperty(prop, view); + } + if (result.HasError() && result.GetError() == Error::NONEXISTENT_OBJECT) { + continue; + } + if (result.HasError()) { + spdlog::debug("Encountered an Error while trying to get a vertex property."); + return std::nullopt; + } + properties.push_back(PropertyToTypedValue<TypedValue>(result.GetValue())); + ids.push_back(prop); + } + GetPropElement element{std::move(properties), std::move(ids), acc, e_acc}; + return {std::move(element)}; + }; + + auto find_edge = [](const VertexAccessor &v, const EdgeTypeId &e) -> std::optional<EdgeAccessor> { + auto in = v.InEdges(view, {e}); + MG_ASSERT(in.HasValue()); + for (auto &edge : in.GetValue()) { + if (edge.EdgeType() == e) { + return edge; + } + } + + auto out = v.OutEdges(view, {e}); + MG_ASSERT(out.HasValue()); + for (auto &edge : out.GetValue()) { + if (edge.EdgeType() == e) { + return edge; + } + } + return std::nullopt; + }; + + std::vector<GetPropElement> elements; + + for (const auto &[vertex, maybe_edge] : req.vertices_and_edges) { + const auto &[label, pk_v] = vertex; + auto pk = ConvertPropertyVector(pk_v); + auto v_acc = dba.FindVertex(pk, view); + if (!v_acc) { + return msgs::GetPropertiesResponse{.result = msgs::GetPropertiesResponse::OUT_OF_SHARD_RANGE}; + } + std::optional<EdgeAccessor> e_acc; + if (maybe_edge) { + e_acc = find_edge(*v_acc, *maybe_edge); + if (!e_acc) { + return msgs::GetPropertiesResponse{.result = msgs::GetPropertiesResponse::OUT_OF_SHARD_RANGE}; + } + } + + const auto *symbol = (maybe_edge) ? expr::identifier_edge_symbol : expr::identifier_node_symbol; + if (req.filter && !FilterOnVertex(dba, *v_acc, {*req.filter}, symbol, e_acc)) { + continue; + } + + std::optional<GetPropElement> collected_properties; + collected_properties = collect_props(*v_acc, req.property_ids, view, e_acc); + if (!collected_properties) { + return msgs::GetPropertiesResponse{.result = msgs::GetPropertiesResponse::FAILURE}; + } + if (collected_properties->ids.empty()) { + continue; + } + elements.push_back(std::move(*collected_properties)); + } + + if (!req.order_by.empty()) { + elements = OrderByElements(dba, req.order_by, std::move(elements)); + } + + std::vector<msgs::GetPropertiesResultRow> results; + results.reserve(elements.size()); + + const auto has_expr_to_evaluate = !req.expressions.empty(); + size_t limit = elements.size(); + if (req.limit && *req.limit < elements.size()) { + limit = *req.limit; + } + for (size_t index = 0; index != limit; ++index) { + auto &element = elements.at(index); + const auto id = element.vertex_acc.Id(view).GetValue(); + std::optional<EdgeTypeId> e_type = + (element.edge_acc) ? std::make_optional(element.edge_acc->EdgeType()) : std::nullopt; + msgs::VertexId v_id{msgs::Label{id.primary_label}, ConvertValueVector(id.primary_key)}; + results.push_back(msgs::GetPropertiesResultRow{ + .vertex_and_edge = {.vertex = std::move(v_id), .edge = e_type}, + .properies_and_ids = { + .ids = std::move(element.ids), + .properties = ConvertToValueVectorFromTypedValueVector(std::move(element.properties_order_by))}}); + if (has_expr_to_evaluate) { + auto expression_results = ConvertToValueVectorFromTypedValueVector( + EvaluateVertexExpressions(dba, element.vertex_acc, req.expressions, expr::identifier_node_symbol)); + results.back().evaluated_expressions = std::move(expression_results); + } + } + + return msgs::GetPropertiesResponse{std::move(results), msgs::GetPropertiesResponse::SUCCESS}; } +// TODO(kostasrim) Handle edges + } // namespace memgraph::storage::v3 diff --git a/tests/simulation/shard_rsm.cpp b/tests/simulation/shard_rsm.cpp index 64d0a0861..f2a90262f 100644 --- a/tests/simulation/shard_rsm.cpp +++ b/tests/simulation/shard_rsm.cpp @@ -446,6 +446,56 @@ std::tuple<size_t, std::optional<msgs::VertexId>> AttemptToScanAllWithExpression } } +msgs::GetPropertiesResponse AttemptToGetProperties(ShardClient &client, std::vector<PropertyId> properties, + std::vector<msgs::VertexId> vertices, + std::vector<msgs::EdgeTypeId> edges, + std::optional<size_t> limit = std::nullopt, + std::optional<uint64_t> filter_prop = std::nullopt, + bool edge = false, + std::optional<std::string> order_by = std::nullopt) { + msgs::GetPropertiesRequest req{}; + req.transaction_id.logical_id = GetTransactionId(); + req.property_ids = std::move(properties); + + if (filter_prop) { + std::string filter_expr = (!edge) ? "MG_SYMBOL_NODE.prop1 >= " : "MG_SYMBOL_EDGE.e_prop = "; + filter_expr += std::to_string(*filter_prop); + req.filter = std::make_optional(std::move(filter_expr)); + } + if (order_by) { + std::string filter_expr = (!edge) ? "MG_SYMBOL_NODE." : "MG_SYMBOL_EDGE."; + filter_expr += *order_by; + msgs::OrderBy order_by{.expression = {std::move(filter_expr)}, .direction = msgs::OrderingDirection::DESCENDING}; + std::vector<msgs::OrderBy> request_order_by; + request_order_by.push_back(std::move(order_by)); + req.order_by = std::move(request_order_by); + } + if (limit) { + req.limit = limit; + } + req.expressions = {std::string("5 = 5")}; + std::vector<msgs::VertexAndEdgeId> req_v; + for (auto &v : vertices) { + req_v.push_back(msgs::VertexAndEdgeId{.vertex = std::move(v)}); + } + for (auto index = 0; index != edges.size(); ++index) { + req_v[index].edge = edges[index]; + } + req.vertices_and_edges = std::move(req_v); + + while (true) { + auto read_res = client.SendReadRequest(req); + if (read_res.HasError()) { + continue; + } + + auto write_response_result = read_res.GetValue(); + auto write_response = std::get<msgs::GetPropertiesResponse>(write_response_result); + + return write_response; + } +} + void AttemptToScanAllWithOrderByOnPrimaryProperty(ShardClient &client, msgs::VertexId start_id, uint64_t batch_limit) { msgs::ScanVerticesRequest scan_req; scan_req.batch_limit = batch_limit; @@ -1064,6 +1114,193 @@ void TestExpandOneGraphTwo(ShardClient &client) { } } +void TestGetProperties(ShardClient &client) { + const auto unique_prop_val_1 = GetUniqueInteger(); + const auto unique_prop_val_2 = GetUniqueInteger(); + const auto unique_prop_val_3 = GetUniqueInteger(); + const auto unique_prop_val_4 = GetUniqueInteger(); + const auto unique_prop_val_5 = GetUniqueInteger(); + + MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val_1)); + MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val_2)); + MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val_3)); + MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val_4)); + MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val_5)); + + const msgs::Label prim_label = {.id = get_primary_label()}; + const msgs::PrimaryKey prim_key = {msgs::Value(static_cast<int64_t>(unique_prop_val_1))}; + const msgs::VertexId v_id = {prim_label, prim_key}; + const msgs::PrimaryKey prim_key_2 = {msgs::Value(static_cast<int64_t>(unique_prop_val_2))}; + const msgs::VertexId v_id_2 = {prim_label, prim_key_2}; + const msgs::PrimaryKey prim_key_3 = {msgs::Value(static_cast<int64_t>(unique_prop_val_3))}; + const msgs::VertexId v_id_3 = {prim_label, prim_key_3}; + const msgs::PrimaryKey prim_key_4 = {msgs::Value(static_cast<int64_t>(unique_prop_val_4))}; + const msgs::VertexId v_id_4 = {prim_label, prim_key_4}; + const msgs::PrimaryKey prim_key_5 = {msgs::Value(static_cast<int64_t>(unique_prop_val_5))}; + const msgs::VertexId v_id_5 = {prim_label, prim_key_5}; + const auto prop_id_2 = PropertyId::FromUint(2); + const auto prop_id_4 = PropertyId::FromUint(4); + const auto prop_id_5 = PropertyId::FromUint(5); + // Vertices + { + // No properties + const auto result = AttemptToGetProperties(client, {}, {v_id, v_id_2}, {}, std::nullopt, unique_prop_val_2); + MG_ASSERT(result.result == msgs::GetPropertiesResponse::SUCCESS); + MG_ASSERT(result.result_row.empty()); + } + { + // All properties + const auto result = AttemptToGetProperties(client, {prop_id_2, prop_id_4, prop_id_5}, {v_id, v_id_2, v_id_3}, {}); + MG_ASSERT(result.result == msgs::GetPropertiesResponse::SUCCESS); + MG_ASSERT(!result.result_row.empty()); + MG_ASSERT(result.result_row.size() == 3); + for (const auto &elem : result.result_row) { + MG_ASSERT(elem.properies_and_ids.ids.size() == 3); + MG_ASSERT(elem.properies_and_ids.properties.size() == 3); + } + } + { + // Two properties from two vertices with a filter on unique_prop_5 + const auto result = AttemptToGetProperties(client, {prop_id_2, prop_id_4}, {v_id, v_id_2, v_id_5}, {}, std::nullopt, + unique_prop_val_5); + MG_ASSERT(result.result == msgs::GetPropertiesResponse::SUCCESS); + MG_ASSERT(!result.result_row.empty()); + MG_ASSERT(result.result_row.size() == 1); + } + { + // One property from three vertices. + const auto result = AttemptToGetProperties(client, {prop_id_2}, {v_id, v_id_2, v_id_3}, {}); + MG_ASSERT(result.result == msgs::GetPropertiesResponse::SUCCESS); + MG_ASSERT(!result.result_row.empty()); + MG_ASSERT(result.result_row.size() == 3); + MG_ASSERT(result.result_row[0].properies_and_ids.ids.size() == 1); + MG_ASSERT(result.result_row[1].properies_and_ids.ids.size() == 1); + MG_ASSERT(result.result_row[2].properies_and_ids.ids.size() == 1); + } + { + // Same as before but with limit of 1 row + const auto result = + AttemptToGetProperties(client, {prop_id_2}, {v_id, v_id_2, v_id_3}, {}, std::make_optional<size_t>(1)); + MG_ASSERT(result.result == msgs::GetPropertiesResponse::SUCCESS); + MG_ASSERT(!result.result_row.empty()); + MG_ASSERT(result.result_row.size() == 1); + } + { + // Same as before but with a limit greater than the elements returned + const auto result = + AttemptToGetProperties(client, {prop_id_2}, {v_id, v_id_2, v_id_3}, {}, std::make_optional<size_t>(5)); + MG_ASSERT(result.result == msgs::GetPropertiesResponse::SUCCESS); + MG_ASSERT(!result.result_row.empty()); + MG_ASSERT(result.result_row.size() == 3); + } + { + // Order by on `prop1` (descending) + const auto result = AttemptToGetProperties(client, {prop_id_2}, {v_id, v_id_2, v_id_3}, {}, std::nullopt, + std::nullopt, false, "prop1"); + MG_ASSERT(result.result == msgs::GetPropertiesResponse::SUCCESS); + MG_ASSERT(!result.result_row.empty()); + MG_ASSERT(result.result_row.size() == 3); + MG_ASSERT(result.result_row[0].vertex_and_edge.vertex == v_id_3); + MG_ASSERT(result.result_row[1].vertex_and_edge.vertex == v_id_2); + MG_ASSERT(result.result_row[2].vertex_and_edge.vertex == v_id); + } + { + // Order by and filter on >= unique_prop_val_3 && assert result row data members + const auto result = AttemptToGetProperties(client, {prop_id_2}, {v_id, v_id_2, v_id_3, v_id_4, v_id_5}, {}, + std::nullopt, unique_prop_val_3, false, "prop1"); + MG_ASSERT(result.result == msgs::GetPropertiesResponse::SUCCESS); + MG_ASSERT(!result.result_row.empty()); + MG_ASSERT(result.result_row.size() == 3); + MG_ASSERT(result.result_row[0].vertex_and_edge.vertex == v_id_5); + MG_ASSERT(result.result_row[0].properies_and_ids.properties.size() == 1); + MG_ASSERT(result.result_row[0].properies_and_ids.properties.front() == prim_key_5.front()); + MG_ASSERT(result.result_row[0].properies_and_ids.ids.size() == 1); + MG_ASSERT(result.result_row[0].properies_and_ids.ids.front() == prop_id_2); + MG_ASSERT(result.result_row[0].evaluated_expressions.size() == 1); + MG_ASSERT(result.result_row[0].evaluated_expressions.front() == msgs::Value(true)); + + MG_ASSERT(result.result_row[1].vertex_and_edge.vertex == v_id_4); + MG_ASSERT(result.result_row[1].properies_and_ids.properties.size() == 1); + MG_ASSERT(result.result_row[1].properies_and_ids.properties.front() == prim_key_4.front()); + MG_ASSERT(result.result_row[1].properies_and_ids.ids.size() == 1); + MG_ASSERT(result.result_row[1].properies_and_ids.ids.front() == prop_id_2); + MG_ASSERT(result.result_row[1].evaluated_expressions.size() == 1); + MG_ASSERT(result.result_row[1].evaluated_expressions.front() == msgs::Value(true)); + + MG_ASSERT(result.result_row[2].vertex_and_edge.vertex == v_id_3); + MG_ASSERT(result.result_row[2].properies_and_ids.properties.size() == 1); + MG_ASSERT(result.result_row[2].properies_and_ids.properties.front() == prim_key_3.front()); + MG_ASSERT(result.result_row[2].properies_and_ids.ids.size() == 1); + MG_ASSERT(result.result_row[2].properies_and_ids.ids.front() == prop_id_2); + MG_ASSERT(result.result_row[2].evaluated_expressions.size() == 1); + MG_ASSERT(result.result_row[2].evaluated_expressions.front() == msgs::Value(true)); + } + + // Edges + const auto edge_gid = GetUniqueInteger(); + const auto edge_type_id = EdgeTypeId::FromUint(GetUniqueInteger()); + const auto unique_edge_prop_id = 7; + const auto edge_prop_val = GetUniqueInteger(); + MG_ASSERT(AttemptToAddEdgeWithProperties(client, unique_prop_val_1, unique_prop_val_2, edge_gid, unique_edge_prop_id, + edge_prop_val, {edge_type_id})); + const auto edge_gid_2 = GetUniqueInteger(); + const auto edge_prop_val_2 = GetUniqueInteger(); + MG_ASSERT(AttemptToAddEdgeWithProperties(client, unique_prop_val_3, unique_prop_val_4, edge_gid_2, + unique_edge_prop_id, edge_prop_val_2, {edge_type_id})); + const auto edge_prop_id = PropertyId::FromUint(unique_edge_prop_id); + // no properties + { + const auto result = AttemptToGetProperties(client, {}, {v_id_2, v_id_3}, {edge_type_id, edge_type_id}); + MG_ASSERT(result.result == msgs::GetPropertiesResponse::SUCCESS); + MG_ASSERT(result.result_row.empty()); + } + // properties for two vertices + { + const auto result = AttemptToGetProperties(client, {edge_prop_id}, {v_id_2, v_id_3}, {edge_type_id, edge_type_id}); + MG_ASSERT(result.result == msgs::GetPropertiesResponse::SUCCESS); + MG_ASSERT(!result.result_row.empty()); + MG_ASSERT(result.result_row.size() == 2); + } + // filter + { + const auto result = AttemptToGetProperties(client, {edge_prop_id}, {v_id_2, v_id_3}, {edge_type_id, edge_type_id}, + {}, {edge_prop_val}, true); + MG_ASSERT(result.result == msgs::GetPropertiesResponse::SUCCESS); + MG_ASSERT(!result.result_row.empty()); + MG_ASSERT(result.result_row.size() == 1); + MG_ASSERT(result.result_row.front().vertex_and_edge.edge); + MG_ASSERT(result.result_row.front().vertex_and_edge.edge.value() == edge_type_id); + MG_ASSERT(result.result_row.front().properies_and_ids.properties.size() == 1); + MG_ASSERT(result.result_row.front().properies_and_ids.properties.front() == + msgs::Value(static_cast<int64_t>(edge_prop_val))); + } + // Order by + { + const auto result = AttemptToGetProperties(client, {edge_prop_id}, {v_id_2, v_id_3}, {edge_type_id, edge_type_id}, + {}, {}, true, "e_prop"); + MG_ASSERT(result.result == msgs::GetPropertiesResponse::SUCCESS); + MG_ASSERT(!result.result_row.empty()); + MG_ASSERT(result.result_row.size() == 2); + MG_ASSERT(result.result_row[0].vertex_and_edge.vertex == v_id_3); + MG_ASSERT(result.result_row[0].vertex_and_edge.edge); + MG_ASSERT(result.result_row[0].vertex_and_edge.edge.value() == edge_type_id); + MG_ASSERT(result.result_row[0].properies_and_ids.properties.size() == 1); + MG_ASSERT(result.result_row[0].properies_and_ids.properties.front() == + msgs::Value(static_cast<int64_t>(edge_prop_val_2))); + MG_ASSERT(result.result_row[0].evaluated_expressions.size() == 1); + MG_ASSERT(result.result_row[0].evaluated_expressions.front() == msgs::Value(true)); + + MG_ASSERT(result.result_row[1].vertex_and_edge.vertex == v_id_2); + MG_ASSERT(result.result_row[1].vertex_and_edge.edge); + MG_ASSERT(result.result_row[1].vertex_and_edge.edge.value() == edge_type_id); + MG_ASSERT(result.result_row[1].properies_and_ids.properties.size() == 1); + MG_ASSERT(result.result_row[1].properies_and_ids.properties.front() == + msgs::Value(static_cast<int64_t>(edge_prop_val))); + MG_ASSERT(result.result_row[1].evaluated_expressions.size() == 1); + MG_ASSERT(result.result_row[1].evaluated_expressions.front() == msgs::Value(true)); + } +} + } // namespace int TestMessages() { @@ -1102,9 +1339,12 @@ int TestMessages() { auto shard_ptr2 = std::make_unique<Shard>(get_primary_label(), min_prim_key, max_prim_key, schema_prop); auto shard_ptr3 = std::make_unique<Shard>(get_primary_label(), min_prim_key, max_prim_key, schema_prop); - shard_ptr1->StoreMapping({{1, "label"}, {2, "prop1"}, {3, "label1"}, {4, "prop2"}, {5, "prop3"}, {6, "prop4"}}); - shard_ptr2->StoreMapping({{1, "label"}, {2, "prop1"}, {3, "label1"}, {4, "prop2"}, {5, "prop3"}, {6, "prop4"}}); - shard_ptr3->StoreMapping({{1, "label"}, {2, "prop1"}, {3, "label1"}, {4, "prop2"}, {5, "prop3"}, {6, "prop4"}}); + shard_ptr1->StoreMapping( + {{1, "label"}, {2, "prop1"}, {3, "label1"}, {4, "prop2"}, {5, "prop3"}, {6, "prop4"}, {7, "e_prop"}}); + shard_ptr2->StoreMapping( + {{1, "label"}, {2, "prop1"}, {3, "label1"}, {4, "prop2"}, {5, "prop3"}, {6, "prop4"}, {7, "e_prop"}}); + shard_ptr3->StoreMapping( + {{1, "label"}, {2, "prop1"}, {3, "label1"}, {4, "prop2"}, {5, "prop3"}, {6, "prop4"}, {7, "e_prop"}}); std::vector<Address> address_for_1{shard_server_2_address, shard_server_3_address}; std::vector<Address> address_for_2{shard_server_1_address, shard_server_3_address}; @@ -1145,6 +1385,8 @@ int TestMessages() { TestExpandOneGraphOne(client); TestExpandOneGraphTwo(client); + // GetProperties tests + TestGetProperties(client); simulator.ShutDown(); SimulatorStats stats = simulator.Stats(); From 7a3caa320cb1044d155b192634cd53b56ca76aa6 Mon Sep 17 00:00:00 2001 From: Kostas Kyrimis <kostaskyrim@gmail.com> Date: Thu, 24 Nov 2022 14:25:20 +0200 Subject: [PATCH 2/9] WiP --- src/query/v2/shard_request_manager.hpp | 92 ++++++++++++++++++++-- src/storage/v3/shard_rsm.cpp | 3 + tests/simulation/CMakeLists.txt | 1 + tests/simulation/shard_request_manager.cpp | 19 ++++- 4 files changed, 108 insertions(+), 7 deletions(-) diff --git a/src/query/v2/shard_request_manager.hpp b/src/query/v2/shard_request_manager.hpp index 20bae7b97..4db77e645 100644 --- a/src/query/v2/shard_request_manager.hpp +++ b/src/query/v2/shard_request_manager.hpp @@ -105,6 +105,7 @@ struct ExecutionState { class ShardRequestManagerInterface { public: using VertexAccessor = memgraph::query::v2::accessors::VertexAccessor; + using EdgeAccessor = memgraph::query::v2::accessors::EdgeAccessor; ShardRequestManagerInterface() = default; ShardRequestManagerInterface(const ShardRequestManagerInterface &) = delete; ShardRequestManagerInterface(ShardRequestManagerInterface &&) = delete; @@ -122,7 +123,8 @@ class ShardRequestManagerInterface { ExpandOneRequest request) = 0; virtual std::vector<CreateExpandResponse> Request(ExecutionState<CreateExpandRequest> &state, std::vector<NewExpand> new_edges) = 0; - + virtual std::vector<GetPropertiesResponse> Request(ExecutionState<GetPropertiesRequest> &state, + GetPropertiesRequest request) = 0; virtual storage::v3::EdgeTypeId NameToEdgeType(const std::string &name) const = 0; virtual storage::v3::PropertyId NameToProperty(const std::string &name) const = 0; virtual storage::v3::LabelId NameToLabel(const std::string &name) const = 0; @@ -146,6 +148,7 @@ class ShardRequestManager : public ShardRequestManagerInterface { using ShardMap = memgraph::coordinator::ShardMap; using CompoundKey = memgraph::coordinator::PrimaryKey; using VertexAccessor = memgraph::query::v2::accessors::VertexAccessor; + using EdgeAccessor = memgraph::query::v2::accessors::EdgeAccessor; ShardRequestManager(CoordinatorClient coord, memgraph::io::Io<TTransport> &&io) : coord_cli_(std::move(coord)), io_(std::move(io)) {} @@ -353,6 +356,21 @@ class ShardRequestManager : public ShardRequestManagerInterface { return result_rows; } + std::vector<GetPropertiesResponse> Request(ExecutionState<GetPropertiesRequest> &state, + GetPropertiesRequest requests) override { + MaybeInitializeExecutionState(state, std::move(requests)); + SendAllRequests(state); + + std::vector<GetPropertiesResponse> responses; + // 2. Block untill all the futures are exhausted + do { + AwaitOnResponses(state, responses); + } while (!state.shard_cache.empty()); + + MaybeCompleteState(state); + return responses; + } + private: enum class PaginatedResponseState { Pending, PartiallyFinished }; @@ -373,6 +391,13 @@ class ShardRequestManager : public ShardRequestManagerInterface { } } + template <typename ExecutionState> + void ThrowIfStateExecuting(ExecutionState &state) const { + if (state.state == ExecutionState::EXECUTING) [[unlikely]] { + throw std::runtime_error("State is completed and must be reset"); + } + } + template <typename ExecutionState> void MaybeCompleteState(ExecutionState &state) const { if (state.requests.empty()) { @@ -508,6 +533,33 @@ class ShardRequestManager : public ShardRequestManagerInterface { state.state = ExecutionState<ExpandOneRequest>::EXECUTING; } + void MaybeInitializeExecutionState(ExecutionState<GetPropertiesRequest> &state, GetPropertiesRequest request) { + ThrowIfStateCompleted(state); + ThrowIfStateExecuting(state); + + std::map<Shard, GetPropertiesRequest> per_shard_request_table; + auto top_level_rqst_template = request; + top_level_rqst_template.transaction_id = transaction_id_; + top_level_rqst_template.vertices_and_edges.clear(); + + state.transaction_id = transaction_id_; + + for (auto &[vertex, maybe_edge] : request.vertices_and_edges) { + auto shard = + shards_map_.GetShardForKey(vertex.first.id, storage::conversions::ConvertPropertyVector(vertex.second)); + if (!per_shard_request_table.contains(shard)) { + per_shard_request_table.insert(std::pair(shard, top_level_rqst_template)); + state.shard_cache.push_back(shard); + } + per_shard_request_table[shard].vertices_and_edges.push_back({std::move(vertex), maybe_edge}); + } + + for (auto &[shard, rqst] : per_shard_request_table) { + state.requests.push_back(std::move(rqst)); + } + state.state = ExecutionState<GetPropertiesRequest>::EXECUTING; + } + StorageClient &GetStorageClientForShard(Shard shard) { if (!storage_cli_manager_.Exists(shard)) { AddStorageClientToManager(shard); @@ -532,7 +584,8 @@ class ShardRequestManager : public ShardRequestManagerInterface { storage_cli_manager_.AddClient(target_shard, std::move(cli)); } - void SendAllRequests(ExecutionState<ScanVerticesRequest> &state) { + template <typename TRequest> + void SendAllRequests(ExecutionState<TRequest> &state) { int64_t shard_idx = 0; for (const auto &request : state.requests) { const auto ¤t_shard = state.shard_cache[shard_idx]; @@ -581,9 +634,6 @@ class ShardRequestManager : public ShardRequestManagerInterface { int64_t request_idx = 0; for (auto shard_it = shard_cache_ref.begin(); shard_it != shard_cache_ref.end();) { - // This is fine because all new_vertices of each request end up on the same shard - const auto labels = state.requests[request_idx].new_vertices[0].label_ids; - auto &storage_client = GetStorageClientForShard(*shard_it); auto poll_result = storage_client.AwaitAsyncWriteRequest(); @@ -649,6 +699,38 @@ class ShardRequestManager : public ShardRequestManagerInterface { } } + void AwaitOnResponses(ExecutionState<GetPropertiesRequest> &state, std::vector<GetPropertiesResponse> &responses) { + auto &shard_cache_ref = state.shard_cache; + int64_t request_idx = 0; + + for (auto shard_it = shard_cache_ref.begin(); shard_it != shard_cache_ref.end();) { + auto &storage_client = GetStorageClientForShard(*shard_it); + + auto poll_result = storage_client.PollAsyncReadRequest(); + if (!poll_result) { + ++shard_it; + ++request_idx; + continue; + } + + if (poll_result->HasError()) { + throw std::runtime_error("GetProperties request timed out"); + } + + ReadResponses response_variant = poll_result->GetValue(); + auto response = std::get<GetPropertiesResponse>(response_variant); + if (response.result != GetPropertiesResponse::SUCCESS) { + throw std::runtime_error("GetProperties request did not succeed"); + } + + responses.push_back(std::move(response)); + shard_it = shard_cache_ref.erase(shard_it); + // Needed to maintain the 1-1 mapping between the ShardCache and the requests. + auto it = state.requests.begin() + request_idx; + state.requests.erase(it); + } + } + void AwaitOnPaginatedRequests(ExecutionState<ScanVerticesRequest> &state, std::vector<ScanVerticesResponse> &responses, std::map<Shard, PaginatedResponseState> &paginated_response_tracker) { diff --git a/src/storage/v3/shard_rsm.cpp b/src/storage/v3/shard_rsm.cpp index 3188458ce..9a2d6dcc8 100644 --- a/src/storage/v3/shard_rsm.cpp +++ b/src/storage/v3/shard_rsm.cpp @@ -518,6 +518,9 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::GetPropertiesRequest &&req) { if (req.vertices_and_edges.empty()) { return msgs::GetPropertiesResponse{.result = msgs::GetPropertiesResponse::FAILURE}; } + if (req.property_ids.empty()) { + return msgs::GetPropertiesResponse{.result = msgs::GetPropertiesResponse::SUCCESS}; + } auto shard_acc = shard_->Access(req.transaction_id); auto dba = DbAccessor{&shard_acc}; diff --git a/tests/simulation/CMakeLists.txt b/tests/simulation/CMakeLists.txt index 9e1a4c71e..2d7e6c49c 100644 --- a/tests/simulation/CMakeLists.txt +++ b/tests/simulation/CMakeLists.txt @@ -32,3 +32,4 @@ add_simulation_test(trial_query_storage/query_storage_test.cpp) add_simulation_test(sharded_map.cpp) add_simulation_test(shard_rsm.cpp) add_simulation_test(cluster_property_test.cpp) +add_simulation_test(shard_request_manager.cpp) diff --git a/tests/simulation/shard_request_manager.cpp b/tests/simulation/shard_request_manager.cpp index 746ab385f..e62e15318 100644 --- a/tests/simulation/shard_request_manager.cpp +++ b/tests/simulation/shard_request_manager.cpp @@ -221,8 +221,22 @@ void TestExpandOne(msgs::ShardRequestManagerInterface &shard_request_manager) { MG_ASSERT(result_rows.size() == 2); } -template <typename ShardRequestManager> -void TestAggregate(ShardRequestManager &io) {} +void TestGetProperties(msgs::ShardRequestManagerInterface &shard_request_manager) { + using PropVal = msgs::Value; + + auto label_id = shard_request_manager.NameToLabel("test_label"); + msgs::VertexId v1{{label_id}, {PropVal(int64_t(1)), PropVal(int64_t(0))}}; + msgs::VertexId v2{{label_id}, {PropVal(int64_t(13)), PropVal(int64_t(13))}}; + + msgs::ExecutionState<msgs::GetPropertiesRequest> state; + msgs::GetPropertiesRequest request; + + request.vertices_and_edges.push_back({v1}); + request.vertices_and_edges.push_back({v2}); + + auto result = shard_request_manager.Request(state, std::move(request)); + MG_ASSERT(result.size() == 2); +} void DoTest() { SimulatorConfig config{ @@ -343,6 +357,7 @@ void DoTest() { TestScanVertices(io); TestCreateVertices(io); TestCreateExpand(io); + TestGetProperties(io); simulator.ShutDown(); From 9621532d3d1993b07ee6a6de77d66a48440fb4c0 Mon Sep 17 00:00:00 2001 From: Kostas Kyrimis <kostaskyrim@gmail.com> Date: Wed, 30 Nov 2022 14:59:00 +0200 Subject: [PATCH 3/9] Prototype suggested changes and polish PR --- src/query/v2/requests.hpp | 22 +-- src/storage/v3/request_helper.cpp | 145 ++++++++--------- src/storage/v3/request_helper.hpp | 72 +++++++-- src/storage/v3/shard_rsm.cpp | 251 +++++++++++++++++++----------- tests/simulation/shard_rsm.cpp | 145 +++++++++-------- 5 files changed, 360 insertions(+), 275 deletions(-) diff --git a/src/query/v2/requests.hpp b/src/query/v2/requests.hpp index c751b16ec..9ff9a1bae 100644 --- a/src/query/v2/requests.hpp +++ b/src/query/v2/requests.hpp @@ -368,16 +368,12 @@ struct ScanVerticesResponse { std::vector<ScanResultRow> results; }; -struct VertexAndEdgeId { - VertexId vertex; - std::optional<EdgeTypeId> edge; -}; - struct GetPropertiesRequest { Hlc transaction_id; - std::vector<VertexAndEdgeId> vertices_and_edges; + std::vector<VertexId> vertex_ids; + std::vector<std::pair<VertexId, EdgeId>> vertices_and_edges; - std::vector<PropertyId> property_ids; + std::optional<std::vector<PropertyId>> property_ids; std::vector<std::string> expressions; std::vector<OrderBy> order_by; @@ -388,22 +384,16 @@ struct GetPropertiesRequest { std::optional<std::string> filter; }; -struct PropIdValue { - std::vector<PropertyId> ids; - std::vector<Value> properties; -}; - struct GetPropertiesResultRow { - VertexAndEdgeId vertex_and_edge; + VertexId vertex; + std::optional<EdgeId> edge; - PropIdValue properies_and_ids; + std::vector<std::pair<PropertyId, Value>> props; std::vector<Value> evaluated_expressions; }; struct GetPropertiesResponse { std::vector<GetPropertiesResultRow> result_row; - enum RequestResult : uint16_t { OUT_OF_SHARD_RANGE, SUCCESS, FAILURE }; - RequestResult result; std::optional<ShardError> error; }; diff --git a/src/storage/v3/request_helper.cpp b/src/storage/v3/request_helper.cpp index 7d367d850..260147a25 100644 --- a/src/storage/v3/request_helper.cpp +++ b/src/storage/v3/request_helper.cpp @@ -14,6 +14,7 @@ #include <iterator> #include <vector> +#include "pretty_print_ast_to_original_expression.hpp" #include "storage/v3/bindings/db_accessor.hpp" #include "storage/v3/bindings/pretty_print_ast_to_original_expression.hpp" #include "storage/v3/expr.hpp" @@ -221,30 +222,39 @@ std::vector<TypedValue> EvaluateVertexExpressions(DbAccessor &dba, const VertexA return evaluated_expressions; } +std::vector<TypedValue> EvaluateEdgeExpressions(DbAccessor &dba, const VertexAccessor &v_acc, const EdgeAccessor &e_acc, + const std::vector<std::string> &expressions) { + std::vector<TypedValue> evaluated_expressions; + evaluated_expressions.reserve(expressions.size()); + + std::transform(expressions.begin(), expressions.end(), std::back_inserter(evaluated_expressions), + [&dba, &v_acc, &e_acc](const auto &expression) { + return ComputeExpression(dba, v_acc, e_acc, expression, expr::identifier_node_symbol, + expr::identifier_edge_symbol); + }); + + return evaluated_expressions; +} + ShardResult<std::map<PropertyId, Value>> CollectAllPropertiesFromAccessor(const VertexAccessor &acc, View view, const Schemas::Schema &schema) { - std::map<PropertyId, Value> ret; - auto props = acc.Properties(view); - if (props.HasError()) { - spdlog::debug("Encountered an error while trying to get vertex properties."); - return props.GetError(); + auto ret = impl::CollectAllPropertiesImpl<VertexAccessor>(acc, view); + if (ret.HasError()) { + return ret.GetError(); } - auto &properties = props.GetValue(); - std::transform(properties.begin(), properties.end(), std::inserter(ret, ret.begin()), - [](std::pair<const PropertyId, PropertyValue> &pair) { - return std::make_pair(pair.first, FromPropertyValueToValue(std::move(pair.second))); - }); - properties.clear(); - auto pks = PrimaryKeysFromAccessor(acc, view, schema); if (pks) { - ret.merge(*pks); + ret.GetValue().merge(*pks); } return ret; } +ShardResult<std::map<PropertyId, Value>> CollectAllPropertiesFromAccessor(const VertexAccessor &acc, View view) { + return impl::CollectAllPropertiesImpl(acc, view); +} + EdgeUniquenessFunction InitializeEdgeUniquenessFunction(bool only_unique_neighbor_rows) { // Functions to select connecting edges based on uniquness EdgeUniquenessFunction maybe_filter_based_on_edge_uniquness; @@ -351,15 +361,19 @@ EdgeFiller InitializeEdgeFillerFunction(const msgs::ExpandOneRequest &req) { return edge_filler; } -bool FilterOnVertex(DbAccessor &dba, const storage::v3::VertexAccessor &v_acc, const std::vector<std::string> &filters, - const std::string_view node_name, const std::optional<EdgeAccessor> &e_acc) { - return std::ranges::all_of(filters, [&node_name, &dba, &v_acc, &e_acc](const auto &filter_expr) { - TypedValue result; - if (e_acc) { - result = ComputeExpression(dba, v_acc, e_acc, filter_expr, "", node_name); - } else { - result = ComputeExpression(dba, v_acc, e_acc, filter_expr, node_name, ""); - } +bool FilterOnVertex(DbAccessor &dba, const storage::v3::VertexAccessor &v_acc, + const std::vector<std::string> &filters) { + return std::ranges::all_of(filters, [&dba, &v_acc](const auto &filter_expr) { + const auto result = ComputeExpression(dba, v_acc, std::nullopt, filter_expr, expr::identifier_node_symbol, ""); + return result.IsBool() && result.ValueBool(); + }); +} + +bool FilterOnEdge(DbAccessor &dba, const storage::v3::VertexAccessor &v_acc, const EdgeAccessor &e_acc, + const std::vector<std::string> &filters) { + return std::ranges::all_of(filters, [&dba, &v_acc, &e_acc](const auto &filter_expr) { + const auto result = + ComputeExpression(dba, v_acc, e_acc, filter_expr, expr::identifier_node_symbol, expr::identifier_edge_symbol); return result.IsBool() && result.ValueBool(); }); } @@ -444,61 +458,6 @@ ShardResult<msgs::ExpandOneResultRow> GetExpandOneResult( return result_row; } -std::vector<GetPropElement> OrderByElements(DbAccessor &dba, std::vector<msgs::OrderBy> &order_by, - std::vector<GetPropElement> &&vertices) { - std::vector<Ordering> ordering; - ordering.reserve(order_by.size()); - for (const auto &order : order_by) { - switch (order.direction) { - case memgraph::msgs::OrderingDirection::ASCENDING: { - ordering.push_back(Ordering::ASC); - break; - } - case memgraph::msgs::OrderingDirection::DESCENDING: { - ordering.push_back(Ordering::DESC); - break; - } - } - } - struct PropElement { - std::vector<TypedValue> properties_order_by; - VertexAccessor vertex_acc; - GetPropElement *original_element; - }; - - std::vector<PropElement> ordered; - auto compare_typed_values = TypedValueVectorCompare(ordering); - for (auto &vertex : vertices) { - std::vector<TypedValue> properties; - properties.reserve(order_by.size()); - const auto *symbol = (vertex.edge_acc) ? expr::identifier_edge_symbol : expr::identifier_node_symbol; - for (const auto &order : order_by) { - TypedValue val; - if (vertex.edge_acc) { - val = ComputeExpression(dba, vertex.vertex_acc, vertex.edge_acc, order.expression.expression, "", symbol); - } else { - val = ComputeExpression(dba, vertex.vertex_acc, vertex.edge_acc, order.expression.expression, symbol, ""); - } - properties.push_back(std::move(val)); - } - - ordered.push_back({std::move(properties), vertex.vertex_acc, &vertex}); - } - - std::sort(ordered.begin(), ordered.end(), [&compare_typed_values](const auto &lhs, const auto &rhs) { - return compare_typed_values(lhs.properties_order_by, rhs.properties_order_by); - }); - - std::vector<GetPropElement> results_ordered; - results_ordered.reserve(ordered.size()); - - for (auto &elem : ordered) { - results_ordered.push_back(std::move(*elem.original_element)); - } - - return results_ordered; -} - VerticesIterable::Iterator GetStartVertexIterator(VerticesIterable &vertex_iterable, const std::vector<PropertyValue> &primary_key, const View view) { auto it = vertex_iterable.begin(); @@ -584,4 +543,36 @@ std::vector<Element<EdgeAccessor>> OrderByEdges(DbAccessor &dba, std::vector<Edg return ordered; } +std::vector<Element<std::pair<VertexAccessor, EdgeAccessor>>> OrderByEdges( + DbAccessor &dba, std::vector<EdgeAccessor> &iterable, std::vector<msgs::OrderBy> &order_by_edges, + const std::vector<VertexAccessor> &vertex_acc) { + MG_ASSERT(vertex_acc.size() == iterable.size()); + std::vector<Ordering> ordering; + ordering.reserve(order_by_edges.size()); + std::transform(order_by_edges.begin(), order_by_edges.end(), std::back_inserter(ordering), + [](const auto &order_by) { return ConvertMsgsOrderByToOrdering(order_by.direction); }); + + std::vector<Element<std::pair<VertexAccessor, EdgeAccessor>>> ordered; + VertexAccessor current = vertex_acc.front(); + size_t id = 0; + for (auto it = iterable.begin(); it != iterable.end(); it++, id++) { + current = vertex_acc[id]; + std::vector<TypedValue> properties_order_by; + properties_order_by.reserve(order_by_edges.size()); + std::transform(order_by_edges.begin(), order_by_edges.end(), std::back_inserter(properties_order_by), + [&dba, it, current](const auto &order_by) { + return ComputeExpression(dba, current, *it, order_by.expression.expression, + expr::identifier_node_symbol, expr::identifier_edge_symbol); + }); + + ordered.push_back({std::move(properties_order_by), {current, *it}}); + } + + auto compare_typed_values = TypedValueVectorCompare(ordering); + std::sort(ordered.begin(), ordered.end(), [compare_typed_values](const auto &pair1, const auto &pair2) { + return compare_typed_values(pair1.properties_order_by, pair2.properties_order_by); + }); + return ordered; +} + } // namespace memgraph::storage::v3 diff --git a/src/storage/v3/request_helper.hpp b/src/storage/v3/request_helper.hpp index f14535beb..13871d219 100644 --- a/src/storage/v3/request_helper.hpp +++ b/src/storage/v3/request_helper.hpp @@ -20,6 +20,7 @@ #include "storage/v3/edge_accessor.hpp" #include "storage/v3/expr.hpp" #include "storage/v3/shard.hpp" +#include "storage/v3/value_conversions.hpp" #include "storage/v3/vertex_accessor.hpp" #include "utils/template_utils.hpp" @@ -31,7 +32,7 @@ using EdgeFiller = using msgs::Value; template <typename T> -concept ObjectAccessor = utils::SameAsAnyOf<T, VertexAccessor, EdgeAccessor>; +concept ObjectAccessor = utils::SameAsAnyOf<T, VertexAccessor, EdgeAccessor, std::pair<VertexAccessor, EdgeAccessor>>; inline bool TypedValueCompare(const TypedValue &a, const TypedValue &b) { // in ordering null comes after everything else @@ -131,13 +132,6 @@ struct Element { TObjectAccessor object_acc; }; -struct GetPropElement { - std::vector<TypedValue> properties_order_by; - std::vector<PropertyId> ids; - VertexAccessor vertex_acc; - std::optional<EdgeAccessor> edge_acc; -}; - template <typename T> concept VerticesIt = utils::SameAsAnyOf<T, VerticesIterable, std::vector<VertexAccessor>>; @@ -170,12 +164,16 @@ std::vector<Element<VertexAccessor>> OrderByVertices(DbAccessor &dba, TIterable return ordered; } +template <typename T> +concept EdgeObjectAccessor = utils::SameAsAnyOf<T, EdgeAccessor, std::pair<VertexAccessor, EdgeAccessor>>; + std::vector<Element<EdgeAccessor>> OrderByEdges(DbAccessor &dba, std::vector<EdgeAccessor> &iterable, std::vector<msgs::OrderBy> &order_by_edges, const VertexAccessor &vertex_acc); -std::vector<GetPropElement> OrderByElements(DbAccessor &dba, std::vector<msgs::OrderBy> &order_bys, - std::vector<GetPropElement> &&vertices); +std::vector<Element<std::pair<VertexAccessor, EdgeAccessor>>> OrderByEdges( + DbAccessor &dba, std::vector<EdgeAccessor> &iterable, std::vector<msgs::OrderBy> &order_by_edges, + const std::vector<VertexAccessor> &vertex_acc); VerticesIterable::Iterator GetStartVertexIterator(VerticesIterable &vertex_iterable, const std::vector<PropertyValue> &primary_key, View view); @@ -187,19 +185,65 @@ std::vector<Element<VertexAccessor>>::const_iterator GetStartOrderedElementsIter std::array<std::vector<EdgeAccessor>, 2> GetEdgesFromVertex(const VertexAccessor &vertex_accessor, msgs::EdgeDirection direction); -bool FilterOnVertex(DbAccessor &dba, const storage::v3::VertexAccessor &v_acc, const std::vector<std::string> &filters, - const std::string_view node_name, const std::optional<EdgeAccessor> &e_acc = std::nullopt); +bool FilterOnVertex(DbAccessor &dba, const storage::v3::VertexAccessor &v_acc, const std::vector<std::string> &filters); + +bool FilterOnEdge(DbAccessor &dba, const storage::v3::VertexAccessor &v_acc, const EdgeAccessor &e_acc, + const std::vector<std::string> &filters); std::vector<TypedValue> EvaluateVertexExpressions(DbAccessor &dba, const VertexAccessor &v_acc, const std::vector<std::string> &expressions, std::string_view node_name); -ShardResult<std::map<PropertyId, Value>> CollectSpecificPropertiesFromAccessor(const VertexAccessor &acc, +std::vector<TypedValue> EvaluateEdgeExpressions(DbAccessor &dba, const VertexAccessor &v_acc, const EdgeAccessor &e_acc, + const std::vector<std::string> &expressions); + +template <typename T> +concept TAccessor = utils::SameAsAnyOf<T, VertexAccessor, EdgeAccessor>; + +template <typename TAccessor> +ShardResult<std::map<PropertyId, Value>> CollectSpecificPropertiesFromAccessor(const TAccessor &acc, const std::vector<PropertyId> &props, - View view); + View view) { + std::map<PropertyId, Value> ret; + + for (const auto &prop : props) { + auto result = acc.GetProperty(prop, view); + if (result.HasError()) { + spdlog::debug("Encountered an Error while trying to get a vertex property."); + return result.GetError(); + } + auto &value = result.GetValue(); + ret.emplace(std::make_pair(prop, FromPropertyValueToValue(std::move(value)))); + } + + return ret; +} ShardResult<std::map<PropertyId, Value>> CollectAllPropertiesFromAccessor(const VertexAccessor &acc, View view, const Schemas::Schema &schema); +namespace impl { +template <typename TAccessor> +ShardResult<std::map<PropertyId, Value>> CollectAllPropertiesImpl(const TAccessor &acc, View view) { + std::map<PropertyId, Value> ret; + auto props = acc.Properties(view); + if (props.HasError()) { + spdlog::debug("Encountered an error while trying to get vertex properties."); + return props.GetError(); + } + + auto &properties = props.GetValue(); + std::transform(properties.begin(), properties.end(), std::inserter(ret, ret.begin()), + [](std::pair<const PropertyId, PropertyValue> &pair) { + return std::make_pair(pair.first, conversions::FromPropertyValueToValue(std::move(pair.second))); + }); + return ret; +} +} // namespace impl + +template <typename TAccessor> +ShardResult<std::map<PropertyId, Value>> CollectAllPropertiesFromAccessor(const TAccessor &acc, View view) { + return impl::CollectAllPropertiesImpl<TAccessor>(acc, view); +} EdgeUniquenessFunction InitializeEdgeUniquenessFunction(bool only_unique_neighbor_rows); diff --git a/src/storage/v3/shard_rsm.cpp b/src/storage/v3/shard_rsm.cpp index 3188458ce..b628127af 100644 --- a/src/storage/v3/shard_rsm.cpp +++ b/src/storage/v3/shard_rsm.cpp @@ -10,6 +10,8 @@ // licenses/APL.txt. #include <algorithm> +#include <exception> +#include <experimental/source_location> #include <functional> #include <iterator> #include <optional> @@ -19,7 +21,6 @@ #include "common/errors.hpp" #include "parser/opencypher/parser.hpp" -#include "pretty_print_ast_to_original_expression.hpp" #include "query/v2/requests.hpp" #include "storage/v2/vertex.hpp" #include "storage/v2/view.hpp" @@ -330,7 +331,7 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::ScanVerticesRequest &&req) { std::vector<Value> expression_results; if (!req.filter_expressions.empty()) { // NOTE - DbAccessor might get removed in the future. - const bool eval = FilterOnVertex(dba, vertex, req.filter_expressions, expr::identifier_node_symbol); + const bool eval = FilterOnVertex(dba, vertex, req.filter_expressions); if (!eval) { return; } @@ -435,7 +436,7 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::ExpandOneRequest &&req) { } if (!req.filters.empty()) { // NOTE - DbAccessor might get removed in the future. - const bool eval = FilterOnVertex(dba, src_vertex_acc_opt.value(), req.filters, expr::identifier_node_symbol); + const bool eval = FilterOnVertex(dba, src_vertex_acc_opt.value(), req.filters); if (!eval) { continue; } @@ -515,124 +516,184 @@ msgs::WriteResponses ShardRsm::ApplyWrite(msgs::CommitRequest &&req) { }; msgs::ReadResponses ShardRsm::HandleRead(msgs::GetPropertiesRequest &&req) { - if (req.vertices_and_edges.empty()) { - return msgs::GetPropertiesResponse{.result = msgs::GetPropertiesResponse::FAILURE}; + if (!req.vertex_ids.empty() && !req.vertices_and_edges.empty()) { + auto error = CreateErrorResponse( + {common::ErrorCode::VERTEX_HAS_EDGES, std::experimental::source_location::current()}, req.transaction_id, ""); + return msgs::GetPropertiesResponse{.error = {}}; } auto shard_acc = shard_->Access(req.transaction_id); auto dba = DbAccessor{&shard_acc}; const auto view = storage::v3::View::NEW; - auto collect_props = [](const VertexAccessor &acc, const std::vector<PropertyId> &props, View view, - const std::optional<EdgeAccessor> &e_acc) mutable -> std::optional<GetPropElement> { - std::vector<TypedValue> properties; - std::vector<PropertyId> ids; - for (const auto &prop : props) { - ShardResult<PropertyValue> result{PropertyValue()}; - if (e_acc) { - result = e_acc->GetProperty(prop, view); - } else { - result = acc.GetProperty(prop, view); - } - if (result.HasError() && result.GetError() == common::ErrorCode::NONEXISTENT_OBJECT) { - continue; - } - if (result.HasError()) { - spdlog::debug("Encountered an Error while trying to get a vertex property."); - return std::nullopt; - } - properties.push_back(PropertyToTypedValue<TypedValue>(result.GetValue())); - ids.push_back(prop); + auto transform_props = [](std::map<PropertyId, Value> &&value) { + std::vector<std::pair<PropertyId, Value>> result; + result.reserve(value.size()); + for (auto &[id, val] : value) { + result.push_back({id, std::move(val)}); } - GetPropElement element{std::move(properties), std::move(ids), acc, e_acc}; - return {std::move(element)}; + return result; }; - auto find_edge = [](const VertexAccessor &v, const EdgeTypeId &e) -> std::optional<EdgeAccessor> { - auto in = v.InEdges(view, {e}); + auto collect_props = [&req](const VertexAccessor &v_acc, const std::optional<EdgeAccessor> &e_acc) { + if (req.property_ids) { + if (e_acc) { + return CollectAllPropertiesFromAccessor(*e_acc, view); + } + return CollectAllPropertiesFromAccessor(v_acc, view); + } + + if (e_acc) { + return CollectSpecificPropertiesFromAccessor(v_acc, *req.property_ids, view); + } + return CollectSpecificPropertiesFromAccessor(*e_acc, *req.property_ids, view); + }; + + auto find_edge = [](const VertexAccessor &v, msgs::EdgeId e) -> std::optional<EdgeAccessor> { + auto in = v.InEdges(view); MG_ASSERT(in.HasValue()); for (auto &edge : in.GetValue()) { - if (edge.EdgeType() == e) { + if (edge.Gid().AsUint() == e.gid) { return edge; } } - - auto out = v.OutEdges(view, {e}); + auto out = v.OutEdges(view); MG_ASSERT(out.HasValue()); for (auto &edge : out.GetValue()) { - if (edge.EdgeType() == e) { + if (edge.Gid().AsUint() == e.gid) { return edge; } } return std::nullopt; }; - std::vector<GetPropElement> elements; - - for (const auto &[vertex, maybe_edge] : req.vertices_and_edges) { - const auto &[label, pk_v] = vertex; - auto pk = ConvertPropertyVector(pk_v); - auto v_acc = dba.FindVertex(pk, view); - if (!v_acc) { - return msgs::GetPropertiesResponse{.result = msgs::GetPropertiesResponse::OUT_OF_SHARD_RANGE}; - } - std::optional<EdgeAccessor> e_acc; - if (maybe_edge) { - e_acc = find_edge(*v_acc, *maybe_edge); - if (!e_acc) { - return msgs::GetPropertiesResponse{.result = msgs::GetPropertiesResponse::OUT_OF_SHARD_RANGE}; - } - } - - const auto *symbol = (maybe_edge) ? expr::identifier_edge_symbol : expr::identifier_node_symbol; - if (req.filter && !FilterOnVertex(dba, *v_acc, {*req.filter}, symbol, e_acc)) { - continue; - } - - std::optional<GetPropElement> collected_properties; - collected_properties = collect_props(*v_acc, req.property_ids, view, e_acc); - if (!collected_properties) { - return msgs::GetPropertiesResponse{.result = msgs::GetPropertiesResponse::FAILURE}; - } - if (collected_properties->ids.empty()) { - continue; - } - elements.push_back(std::move(*collected_properties)); - } - - if (!req.order_by.empty()) { - elements = OrderByElements(dba, req.order_by, std::move(elements)); - } - - std::vector<msgs::GetPropertiesResultRow> results; - results.reserve(elements.size()); - const auto has_expr_to_evaluate = !req.expressions.empty(); - size_t limit = elements.size(); - if (req.limit && *req.limit < elements.size()) { - limit = *req.limit; - } - for (size_t index = 0; index != limit; ++index) { - auto &element = elements.at(index); - const auto id = element.vertex_acc.Id(view).GetValue(); - std::optional<EdgeTypeId> e_type = - (element.edge_acc) ? std::make_optional(element.edge_acc->EdgeType()) : std::nullopt; - msgs::VertexId v_id{msgs::Label{id.primary_label}, ConvertValueVector(id.primary_key)}; - results.push_back(msgs::GetPropertiesResultRow{ - .vertex_and_edge = {.vertex = std::move(v_id), .edge = e_type}, - .properies_and_ids = { - .ids = std::move(element.ids), - .properties = ConvertToValueVectorFromTypedValueVector(std::move(element.properties_order_by))}}); - if (has_expr_to_evaluate) { - auto expression_results = ConvertToValueVectorFromTypedValueVector( - EvaluateVertexExpressions(dba, element.vertex_acc, req.expressions, expr::identifier_node_symbol)); - results.back().evaluated_expressions = std::move(expression_results); + auto emplace_result_row = + [dba, transform_props, collect_props, has_expr_to_evaluate, &req]( + const VertexAccessor &v_acc, + const std::optional<EdgeAccessor> e_acc) mutable -> ShardResult<msgs::GetPropertiesResultRow> { + auto maybe_id = v_acc.Id(view); + if (maybe_id.HasError()) { + return {maybe_id.GetError()}; } + const auto &id = maybe_id.GetValue(); + std::optional<msgs::EdgeId> e_type; + if (e_acc) { + e_type = msgs::EdgeId{e_acc->Gid().AsUint()}; + } + msgs::VertexId v_id{msgs::Label{id.primary_label}, ConvertValueVector(id.primary_key)}; + auto maybe_props = collect_props(v_acc, e_acc); + if (maybe_props.HasError()) { + return {maybe_props.GetError()}; + } + auto props = transform_props(std::move(maybe_props.GetValue())); + auto result = msgs::GetPropertiesResultRow{.vertex = std::move(v_id), .edge = e_type, .props = std::move(props)}; + if (has_expr_to_evaluate) { + std::vector<Value> e_results; + if (e_acc) { + e_results = + ConvertToValueVectorFromTypedValueVector(EvaluateEdgeExpressions(dba, v_acc, *e_acc, req.expressions)); + } else { + e_results = ConvertToValueVectorFromTypedValueVector( + EvaluateVertexExpressions(dba, v_acc, req.expressions, expr::identifier_node_symbol)); + } + result.evaluated_expressions = std::move(e_results); + } + return {std::move(result)}; + }; + + auto get_limit = [&req](const auto &elements) { + size_t limit = elements.size(); + if (req.limit && *req.limit < elements.size()) { + limit = *req.limit; + } + return limit; + }; + + auto collect_response = [get_limit, &req](auto &elements, auto result_row_functor) { + msgs::GetPropertiesResponse response; + const auto limit = get_limit(elements); + for (size_t index = 0; index != limit; ++index) { + auto result_row = result_row_functor(elements[index]); + if (result_row.HasError()) { + return msgs::GetPropertiesResponse{.error = CreateErrorResponse(result_row.GetError(), req.transaction_id, "")}; + } + response.result_row.push_back(std::move(result_row.GetValue())); + } + return response; + }; + + std::vector<VertexAccessor> vertices; + std::vector<EdgeAccessor> edges; + + auto parse_and_filter = [dba, &vertices](auto &cont, auto projection, auto filter, auto maybe_get_edge) mutable { + for (const auto &elem : cont) { + const auto &[label, pk_v] = projection(elem); + auto pk = ConvertPropertyVector(pk_v); + auto v_acc = dba.FindVertex(pk, view); + if (!v_acc || filter(*v_acc, maybe_get_edge(elem))) { + continue; + } + vertices.push_back({*v_acc}); + } + }; + auto identity = [](auto &elem) { return elem; }; + + auto filter_vertex = [dba, req](const auto &acc, const auto & /*edge*/) mutable { + if (!req.filter) { + return false; + } + return !FilterOnVertex(dba, acc, {*req.filter}); + }; + + auto filter_edge = [dba, &edges, &req, find_edge](const auto &acc, const auto &edge) mutable { + auto e_acc = find_edge(acc, edge); + if (!req.filter || !e_acc || !FilterOnEdge(dba, acc, *e_acc, {*req.filter})) { + return false; + } + edges.push_back(*e_acc); + return true; + }; + + // Handler logic here + if (!req.vertex_ids.empty()) { + parse_and_filter(req.vertex_ids, identity, filter_vertex, identity); + } else { + parse_and_filter( + req.vertices_and_edges, [](auto &e) { return e.first; }, filter_edge, [](auto &e) { return e.second; }); } - return msgs::GetPropertiesResponse{std::move(results), msgs::GetPropertiesResponse::SUCCESS}; + if (!req.vertex_ids.empty()) { + if (!req.order_by.empty()) { + auto elements = OrderByVertices(dba, vertices, req.order_by); + return collect_response(elements, [emplace_result_row](auto &element) mutable { + return emplace_result_row(element.object_acc, std::nullopt); + }); + } + return collect_response(vertices, + [emplace_result_row](auto &acc) mutable { return emplace_result_row(acc, std::nullopt); }); + } + if (!req.order_by.empty()) { + auto elements = OrderByEdges(dba, edges, req.order_by, vertices); + return collect_response(elements, [emplace_result_row](auto &element) mutable { + return emplace_result_row(element.object_acc.first, element.object_acc.second); + }); + } + + struct ZipView { + ZipView(std::vector<VertexAccessor> &v, std::vector<EdgeAccessor> &e) : v(v), e(e) {} + size_t size() const { return v.size(); } + auto operator[](size_t index) { return std::make_pair(v[index], e[index]); } + + private: + std::vector<VertexAccessor> &v; + std::vector<EdgeAccessor> &e; + }; + + ZipView vertices_and_edges(vertices, edges); + return collect_response(vertices_and_edges, [emplace_result_row](const auto &acc) mutable { + return emplace_result_row(acc.first, acc.second); + }); } -// TODO(kostasrim) Handle edges - } // namespace memgraph::storage::v3 diff --git a/tests/simulation/shard_rsm.cpp b/tests/simulation/shard_rsm.cpp index 224a84090..80a5cd87d 100644 --- a/tests/simulation/shard_rsm.cpp +++ b/tests/simulation/shard_rsm.cpp @@ -482,7 +482,7 @@ std::tuple<size_t, std::optional<msgs::VertexId>> AttemptToScanAllWithExpression msgs::GetPropertiesResponse AttemptToGetProperties(ShardClient &client, std::vector<PropertyId> properties, std::vector<msgs::VertexId> vertices, - std::vector<msgs::EdgeTypeId> edges, + std::vector<msgs::EdgeId> edges, std::optional<size_t> limit = std::nullopt, std::optional<uint64_t> filter_prop = std::nullopt, bool edge = false, @@ -508,14 +508,25 @@ msgs::GetPropertiesResponse AttemptToGetProperties(ShardClient &client, std::vec req.limit = limit; } req.expressions = {std::string("5 = 5")}; - std::vector<msgs::VertexAndEdgeId> req_v; + std::vector<msgs::VertexId> req_v; + std::vector<msgs::EdgeId> req_e; for (auto &v : vertices) { - req_v.push_back(msgs::VertexAndEdgeId{.vertex = std::move(v)}); + req_v.push_back(std::move(v)); } - for (auto index = 0; index != edges.size(); ++index) { - req_v[index].edge = edges[index]; + for (auto &e : edges) { + req_e.push_back(std::move(e)); + } + + if (!edges.empty()) { + MG_ASSERT(edges.size() == vertices.size()); + size_t id = 0; + req.vertices_and_edges.reserve(req_v.size()); + for (auto &v : req_v) { + req.vertices_and_edges.push_back({std::move(v), std::move(req_e[id++])}); + } + } else { + req.vertex_ids = std::move(req_v); } - req.vertices_and_edges = std::move(req_v); while (true) { auto read_res = client.SendReadRequest(req); @@ -1285,93 +1296,86 @@ void TestGetProperties(ShardClient &client) { { // No properties const auto result = AttemptToGetProperties(client, {}, {v_id, v_id_2}, {}, std::nullopt, unique_prop_val_2); - MG_ASSERT(result.result == msgs::GetPropertiesResponse::SUCCESS); + MG_ASSERT(!result.error); MG_ASSERT(result.result_row.empty()); } { // All properties const auto result = AttemptToGetProperties(client, {prop_id_2, prop_id_4, prop_id_5}, {v_id, v_id_2, v_id_3}, {}); - MG_ASSERT(result.result == msgs::GetPropertiesResponse::SUCCESS); + MG_ASSERT(!result.error); MG_ASSERT(!result.result_row.empty()); MG_ASSERT(result.result_row.size() == 3); for (const auto &elem : result.result_row) { - MG_ASSERT(elem.properies_and_ids.ids.size() == 3); - MG_ASSERT(elem.properies_and_ids.properties.size() == 3); + MG_ASSERT(elem.props.size() == 3); } } { // Two properties from two vertices with a filter on unique_prop_5 const auto result = AttemptToGetProperties(client, {prop_id_2, prop_id_4}, {v_id, v_id_2, v_id_5}, {}, std::nullopt, unique_prop_val_5); - MG_ASSERT(result.result == msgs::GetPropertiesResponse::SUCCESS); - MG_ASSERT(!result.result_row.empty()); + MG_ASSERT(!result.error); MG_ASSERT(result.result_row.size() == 1); } { // One property from three vertices. const auto result = AttemptToGetProperties(client, {prop_id_2}, {v_id, v_id_2, v_id_3}, {}); - MG_ASSERT(result.result == msgs::GetPropertiesResponse::SUCCESS); - MG_ASSERT(!result.result_row.empty()); + MG_ASSERT(!result.error); MG_ASSERT(result.result_row.size() == 3); - MG_ASSERT(result.result_row[0].properies_and_ids.ids.size() == 1); - MG_ASSERT(result.result_row[1].properies_and_ids.ids.size() == 1); - MG_ASSERT(result.result_row[2].properies_and_ids.ids.size() == 1); + MG_ASSERT(result.result_row[0].props.size() == 1); + MG_ASSERT(result.result_row[1].props.size() == 1); + MG_ASSERT(result.result_row[2].props.size() == 1); } { // Same as before but with limit of 1 row const auto result = AttemptToGetProperties(client, {prop_id_2}, {v_id, v_id_2, v_id_3}, {}, std::make_optional<size_t>(1)); - MG_ASSERT(result.result == msgs::GetPropertiesResponse::SUCCESS); - MG_ASSERT(!result.result_row.empty()); + MG_ASSERT(!result.error); MG_ASSERT(result.result_row.size() == 1); } { // Same as before but with a limit greater than the elements returned const auto result = AttemptToGetProperties(client, {prop_id_2}, {v_id, v_id_2, v_id_3}, {}, std::make_optional<size_t>(5)); - MG_ASSERT(result.result == msgs::GetPropertiesResponse::SUCCESS); - MG_ASSERT(!result.result_row.empty()); + MG_ASSERT(!result.error); MG_ASSERT(result.result_row.size() == 3); } { // Order by on `prop1` (descending) const auto result = AttemptToGetProperties(client, {prop_id_2}, {v_id, v_id_2, v_id_3}, {}, std::nullopt, std::nullopt, false, "prop1"); - MG_ASSERT(result.result == msgs::GetPropertiesResponse::SUCCESS); - MG_ASSERT(!result.result_row.empty()); + MG_ASSERT(!result.error); MG_ASSERT(result.result_row.size() == 3); - MG_ASSERT(result.result_row[0].vertex_and_edge.vertex == v_id_3); - MG_ASSERT(result.result_row[1].vertex_and_edge.vertex == v_id_2); - MG_ASSERT(result.result_row[2].vertex_and_edge.vertex == v_id); + MG_ASSERT(result.result_row[0].vertex == v_id_3); + MG_ASSERT(result.result_row[1].vertex == v_id_2); + MG_ASSERT(result.result_row[2].vertex == v_id); } { // Order by and filter on >= unique_prop_val_3 && assert result row data members const auto result = AttemptToGetProperties(client, {prop_id_2}, {v_id, v_id_2, v_id_3, v_id_4, v_id_5}, {}, std::nullopt, unique_prop_val_3, false, "prop1"); - MG_ASSERT(result.result == msgs::GetPropertiesResponse::SUCCESS); - MG_ASSERT(!result.result_row.empty()); + MG_ASSERT(!result.error); MG_ASSERT(result.result_row.size() == 3); - MG_ASSERT(result.result_row[0].vertex_and_edge.vertex == v_id_5); - MG_ASSERT(result.result_row[0].properies_and_ids.properties.size() == 1); - MG_ASSERT(result.result_row[0].properies_and_ids.properties.front() == prim_key_5.front()); - MG_ASSERT(result.result_row[0].properies_and_ids.ids.size() == 1); - MG_ASSERT(result.result_row[0].properies_and_ids.ids.front() == prop_id_2); + MG_ASSERT(result.result_row[0].vertex == v_id_5); + MG_ASSERT(result.result_row[0].props.size() == 1); + MG_ASSERT(result.result_row[0].props.front().second == prim_key_5.front()); + MG_ASSERT(result.result_row[0].props.size() == 1); + MG_ASSERT(result.result_row[0].props.front().first == prop_id_2); MG_ASSERT(result.result_row[0].evaluated_expressions.size() == 1); MG_ASSERT(result.result_row[0].evaluated_expressions.front() == msgs::Value(true)); - MG_ASSERT(result.result_row[1].vertex_and_edge.vertex == v_id_4); - MG_ASSERT(result.result_row[1].properies_and_ids.properties.size() == 1); - MG_ASSERT(result.result_row[1].properies_and_ids.properties.front() == prim_key_4.front()); - MG_ASSERT(result.result_row[1].properies_and_ids.ids.size() == 1); - MG_ASSERT(result.result_row[1].properies_and_ids.ids.front() == prop_id_2); + MG_ASSERT(result.result_row[1].vertex == v_id_4); + MG_ASSERT(result.result_row[1].props.size() == 1); + MG_ASSERT(result.result_row[1].props.front().second == prim_key_4.front()); + MG_ASSERT(result.result_row[1].props.size() == 1); + MG_ASSERT(result.result_row[1].props.front().first == prop_id_2); MG_ASSERT(result.result_row[1].evaluated_expressions.size() == 1); MG_ASSERT(result.result_row[1].evaluated_expressions.front() == msgs::Value(true)); - MG_ASSERT(result.result_row[2].vertex_and_edge.vertex == v_id_3); - MG_ASSERT(result.result_row[2].properies_and_ids.properties.size() == 1); - MG_ASSERT(result.result_row[2].properies_and_ids.properties.front() == prim_key_3.front()); - MG_ASSERT(result.result_row[2].properies_and_ids.ids.size() == 1); - MG_ASSERT(result.result_row[2].properies_and_ids.ids.front() == prop_id_2); + MG_ASSERT(result.result_row[2].vertex == v_id_3); + MG_ASSERT(result.result_row[2].props.size() == 1); + MG_ASSERT(result.result_row[2].props.front().second == prim_key_3.front()); + MG_ASSERT(result.result_row[2].props.size() == 1); + MG_ASSERT(result.result_row[2].props.front().first == prop_id_2); MG_ASSERT(result.result_row[2].evaluated_expressions.size() == 1); MG_ASSERT(result.result_row[2].evaluated_expressions.front() == msgs::Value(true)); } @@ -1388,54 +1392,49 @@ void TestGetProperties(ShardClient &client) { MG_ASSERT(AttemptToAddEdgeWithProperties(client, unique_prop_val_3, unique_prop_val_4, edge_gid_2, unique_edge_prop_id, edge_prop_val_2, {edge_type_id})); const auto edge_prop_id = PropertyId::FromUint(unique_edge_prop_id); + std::vector<msgs::EdgeId> edge_ids = {{edge_gid}, {edge_gid_2}}; // no properties { - const auto result = AttemptToGetProperties(client, {}, {v_id_2, v_id_3}, {edge_type_id, edge_type_id}); - MG_ASSERT(result.result == msgs::GetPropertiesResponse::SUCCESS); + const auto result = AttemptToGetProperties(client, {}, {v_id_2, v_id_3}, edge_ids); + MG_ASSERT(!result.error); MG_ASSERT(result.result_row.empty()); } // properties for two vertices { - const auto result = AttemptToGetProperties(client, {edge_prop_id}, {v_id_2, v_id_3}, {edge_type_id, edge_type_id}); - MG_ASSERT(result.result == msgs::GetPropertiesResponse::SUCCESS); - MG_ASSERT(!result.result_row.empty()); + const auto result = AttemptToGetProperties(client, {edge_prop_id}, {v_id_2, v_id_3}, edge_ids); + MG_ASSERT(!result.error); MG_ASSERT(result.result_row.size() == 2); } // filter { - const auto result = AttemptToGetProperties(client, {edge_prop_id}, {v_id_2, v_id_3}, {edge_type_id, edge_type_id}, - {}, {edge_prop_val}, true); - MG_ASSERT(result.result == msgs::GetPropertiesResponse::SUCCESS); - MG_ASSERT(!result.result_row.empty()); + const auto result = + AttemptToGetProperties(client, {edge_prop_id}, {v_id_2, v_id_3}, edge_ids, {}, {edge_prop_val}, true); + MG_ASSERT(!result.error); MG_ASSERT(result.result_row.size() == 1); - MG_ASSERT(result.result_row.front().vertex_and_edge.edge); - MG_ASSERT(result.result_row.front().vertex_and_edge.edge.value() == edge_type_id); - MG_ASSERT(result.result_row.front().properies_and_ids.properties.size() == 1); - MG_ASSERT(result.result_row.front().properies_and_ids.properties.front() == - msgs::Value(static_cast<int64_t>(edge_prop_val))); + MG_ASSERT(result.result_row.front().edge); + MG_ASSERT(result.result_row.front().edge.value().gid == edge_gid); + MG_ASSERT(result.result_row.front().props.size() == 1); + MG_ASSERT(result.result_row.front().props.front().second == msgs::Value(static_cast<int64_t>(edge_prop_val))); } // Order by { - const auto result = AttemptToGetProperties(client, {edge_prop_id}, {v_id_2, v_id_3}, {edge_type_id, edge_type_id}, - {}, {}, true, "e_prop"); - MG_ASSERT(result.result == msgs::GetPropertiesResponse::SUCCESS); - MG_ASSERT(!result.result_row.empty()); + const auto result = + AttemptToGetProperties(client, {edge_prop_id}, {v_id_2, v_id_3}, edge_ids, {}, {}, true, "e_prop"); + MG_ASSERT(!result.error); MG_ASSERT(result.result_row.size() == 2); - MG_ASSERT(result.result_row[0].vertex_and_edge.vertex == v_id_3); - MG_ASSERT(result.result_row[0].vertex_and_edge.edge); - MG_ASSERT(result.result_row[0].vertex_and_edge.edge.value() == edge_type_id); - MG_ASSERT(result.result_row[0].properies_and_ids.properties.size() == 1); - MG_ASSERT(result.result_row[0].properies_and_ids.properties.front() == - msgs::Value(static_cast<int64_t>(edge_prop_val_2))); + MG_ASSERT(result.result_row[0].vertex == v_id_3); + MG_ASSERT(result.result_row[0].edge); + MG_ASSERT(result.result_row[0].edge.value().gid == edge_gid_2); + MG_ASSERT(result.result_row[0].props.size() == 1); + MG_ASSERT(result.result_row[0].props.front().second == msgs::Value(static_cast<int64_t>(edge_prop_val_2))); MG_ASSERT(result.result_row[0].evaluated_expressions.size() == 1); MG_ASSERT(result.result_row[0].evaluated_expressions.front() == msgs::Value(true)); - MG_ASSERT(result.result_row[1].vertex_and_edge.vertex == v_id_2); - MG_ASSERT(result.result_row[1].vertex_and_edge.edge); - MG_ASSERT(result.result_row[1].vertex_and_edge.edge.value() == edge_type_id); - MG_ASSERT(result.result_row[1].properies_and_ids.properties.size() == 1); - MG_ASSERT(result.result_row[1].properies_and_ids.properties.front() == - msgs::Value(static_cast<int64_t>(edge_prop_val))); + MG_ASSERT(result.result_row[1].vertex == v_id_2); + MG_ASSERT(result.result_row[1].edge); + MG_ASSERT(result.result_row[1].edge.value().gid == edge_gid); + MG_ASSERT(result.result_row[1].props.size() == 1); + MG_ASSERT(result.result_row[1].props.front().second == msgs::Value(static_cast<int64_t>(edge_prop_val))); MG_ASSERT(result.result_row[1].evaluated_expressions.size() == 1); MG_ASSERT(result.result_row[1].evaluated_expressions.front() == msgs::Value(true)); } From 94ef57c459eb2d1919bfa015d81137f6007c1f93 Mon Sep 17 00:00:00 2001 From: Kostas Kyrimis <kostaskyrim@gmail.com> Date: Wed, 30 Nov 2022 17:24:46 +0200 Subject: [PATCH 4/9] Fix small bugs --- src/storage/v3/request_helper.cpp | 3 +-- src/storage/v3/request_helper.hpp | 15 +++++------ src/storage/v3/shard_rsm.cpp | 43 +++++++++++++++++++------------ tests/simulation/shard_rsm.cpp | 21 ++++++++++----- 4 files changed, 47 insertions(+), 35 deletions(-) diff --git a/src/storage/v3/request_helper.cpp b/src/storage/v3/request_helper.cpp index 260147a25..07ea99d9d 100644 --- a/src/storage/v3/request_helper.cpp +++ b/src/storage/v3/request_helper.cpp @@ -14,7 +14,6 @@ #include <iterator> #include <vector> -#include "pretty_print_ast_to_original_expression.hpp" #include "storage/v3/bindings/db_accessor.hpp" #include "storage/v3/bindings/pretty_print_ast_to_original_expression.hpp" #include "storage/v3/expr.hpp" @@ -245,7 +244,7 @@ ShardResult<std::map<PropertyId, Value>> CollectAllPropertiesFromAccessor(const auto pks = PrimaryKeysFromAccessor(acc, view, schema); if (pks) { - ret.GetValue().merge(*pks); + ret.GetValue().merge(std::move(*pks)); } return ret; diff --git a/src/storage/v3/request_helper.hpp b/src/storage/v3/request_helper.hpp index 13871d219..bbe4894e9 100644 --- a/src/storage/v3/request_helper.hpp +++ b/src/storage/v3/request_helper.hpp @@ -32,7 +32,7 @@ using EdgeFiller = using msgs::Value; template <typename T> -concept ObjectAccessor = utils::SameAsAnyOf<T, VertexAccessor, EdgeAccessor, std::pair<VertexAccessor, EdgeAccessor>>; +concept OrderableObject = utils::SameAsAnyOf<T, VertexAccessor, EdgeAccessor, std::pair<VertexAccessor, EdgeAccessor>>; inline bool TypedValueCompare(const TypedValue &a, const TypedValue &b) { // in ordering null comes after everything else @@ -126,7 +126,7 @@ class TypedValueVectorCompare final { std::vector<Ordering> ordering_; }; -template <ObjectAccessor TObjectAccessor> +template <OrderableObject TObjectAccessor> struct Element { std::vector<TypedValue> properties_order_by; TObjectAccessor object_acc; @@ -164,9 +164,6 @@ std::vector<Element<VertexAccessor>> OrderByVertices(DbAccessor &dba, TIterable return ordered; } -template <typename T> -concept EdgeObjectAccessor = utils::SameAsAnyOf<T, EdgeAccessor, std::pair<VertexAccessor, EdgeAccessor>>; - std::vector<Element<EdgeAccessor>> OrderByEdges(DbAccessor &dba, std::vector<EdgeAccessor> &iterable, std::vector<msgs::OrderBy> &order_by_edges, const VertexAccessor &vertex_acc); @@ -198,9 +195,9 @@ std::vector<TypedValue> EvaluateEdgeExpressions(DbAccessor &dba, const VertexAcc const std::vector<std::string> &expressions); template <typename T> -concept TAccessor = utils::SameAsAnyOf<T, VertexAccessor, EdgeAccessor>; +concept PropertiesAccessor = utils::SameAsAnyOf<T, VertexAccessor, EdgeAccessor>; -template <typename TAccessor> +template <PropertiesAccessor TAccessor> ShardResult<std::map<PropertyId, Value>> CollectSpecificPropertiesFromAccessor(const TAccessor &acc, const std::vector<PropertyId> &props, View view) { @@ -222,7 +219,7 @@ ShardResult<std::map<PropertyId, Value>> CollectSpecificPropertiesFromAccessor(c ShardResult<std::map<PropertyId, Value>> CollectAllPropertiesFromAccessor(const VertexAccessor &acc, View view, const Schemas::Schema &schema); namespace impl { -template <typename TAccessor> +template <PropertiesAccessor TAccessor> ShardResult<std::map<PropertyId, Value>> CollectAllPropertiesImpl(const TAccessor &acc, View view) { std::map<PropertyId, Value> ret; auto props = acc.Properties(view); @@ -240,7 +237,7 @@ ShardResult<std::map<PropertyId, Value>> CollectAllPropertiesImpl(const TAccesso } } // namespace impl -template <typename TAccessor> +template <PropertiesAccessor TAccessor> ShardResult<std::map<PropertyId, Value>> CollectAllPropertiesFromAccessor(const TAccessor &acc, View view) { return impl::CollectAllPropertiesImpl<TAccessor>(acc, view); } diff --git a/src/storage/v3/shard_rsm.cpp b/src/storage/v3/shard_rsm.cpp index b628127af..4ea874770 100644 --- a/src/storage/v3/shard_rsm.cpp +++ b/src/storage/v3/shard_rsm.cpp @@ -517,10 +517,13 @@ msgs::WriteResponses ShardRsm::ApplyWrite(msgs::CommitRequest &&req) { msgs::ReadResponses ShardRsm::HandleRead(msgs::GetPropertiesRequest &&req) { if (!req.vertex_ids.empty() && !req.vertices_and_edges.empty()) { - auto error = CreateErrorResponse( - {common::ErrorCode::VERTEX_HAS_EDGES, std::experimental::source_location::current()}, req.transaction_id, ""); + auto shard_error = SHARD_ERROR(ErrorCode::NONEXISTENT_OBJECT); + auto error = CreateErrorResponse(shard_error, req.transaction_id, ""); return msgs::GetPropertiesResponse{.error = {}}; } + if (req.property_ids && req.property_ids->empty()) { + return {}; + } auto shard_acc = shard_->Access(req.transaction_id); auto dba = DbAccessor{&shard_acc}; @@ -535,8 +538,9 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::GetPropertiesRequest &&req) { return result; }; - auto collect_props = [&req](const VertexAccessor &v_acc, const std::optional<EdgeAccessor> &e_acc) { - if (req.property_ids) { + auto collect_props = [&req](const VertexAccessor &v_acc, + const std::optional<EdgeAccessor> &e_acc) -> ShardResult<std::map<PropertyId, Value>> { + if (!req.property_ids) { if (e_acc) { return CollectAllPropertiesFromAccessor(*e_acc, view); } @@ -544,9 +548,9 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::GetPropertiesRequest &&req) { } if (e_acc) { - return CollectSpecificPropertiesFromAccessor(v_acc, *req.property_ids, view); + return CollectSpecificPropertiesFromAccessor(*e_acc, *req.property_ids, view); } - return CollectSpecificPropertiesFromAccessor(*e_acc, *req.property_ids, view); + return CollectSpecificPropertiesFromAccessor(v_acc, *req.property_ids, view); }; auto find_edge = [](const VertexAccessor &v, msgs::EdgeId e) -> std::optional<EdgeAccessor> { @@ -577,9 +581,9 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::GetPropertiesRequest &&req) { return {maybe_id.GetError()}; } const auto &id = maybe_id.GetValue(); - std::optional<msgs::EdgeId> e_type; + std::optional<msgs::EdgeId> e_id; if (e_acc) { - e_type = msgs::EdgeId{e_acc->Gid().AsUint()}; + e_id = msgs::EdgeId{e_acc->Gid().AsUint()}; } msgs::VertexId v_id{msgs::Label{id.primary_label}, ConvertValueVector(id.primary_key)}; auto maybe_props = collect_props(v_acc, e_acc); @@ -587,7 +591,7 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::GetPropertiesRequest &&req) { return {maybe_props.GetError()}; } auto props = transform_props(std::move(maybe_props.GetValue())); - auto result = msgs::GetPropertiesResultRow{.vertex = std::move(v_id), .edge = e_type, .props = std::move(props)}; + auto result = msgs::GetPropertiesResultRow{.vertex = std::move(v_id), .edge = e_id, .props = std::move(props)}; if (has_expr_to_evaluate) { std::vector<Value> e_results; if (e_acc) { @@ -610,11 +614,11 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::GetPropertiesRequest &&req) { return limit; }; - auto collect_response = [get_limit, &req](auto &elements, auto result_row_functor) { + auto collect_response = [get_limit, &req](auto &elements, auto create_result_row) { msgs::GetPropertiesResponse response; const auto limit = get_limit(elements); for (size_t index = 0; index != limit; ++index) { - auto result_row = result_row_functor(elements[index]); + auto result_row = create_result_row(elements[index]); if (result_row.HasError()) { return msgs::GetPropertiesResponse{.error = CreateErrorResponse(result_row.GetError(), req.transaction_id, "")}; } @@ -626,15 +630,15 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::GetPropertiesRequest &&req) { std::vector<VertexAccessor> vertices; std::vector<EdgeAccessor> edges; - auto parse_and_filter = [dba, &vertices](auto &cont, auto projection, auto filter, auto maybe_get_edge) mutable { - for (const auto &elem : cont) { + auto parse_and_filter = [dba, &vertices](auto &container, auto projection, auto filter, auto maybe_get_edge) mutable { + for (const auto &elem : container) { const auto &[label, pk_v] = projection(elem); auto pk = ConvertPropertyVector(pk_v); auto v_acc = dba.FindVertex(pk, view); if (!v_acc || filter(*v_acc, maybe_get_edge(elem))) { continue; } - vertices.push_back({*v_acc}); + vertices.push_back(*v_acc); } }; auto identity = [](auto &elem) { return elem; }; @@ -648,11 +652,15 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::GetPropertiesRequest &&req) { auto filter_edge = [dba, &edges, &req, find_edge](const auto &acc, const auto &edge) mutable { auto e_acc = find_edge(acc, edge); - if (!req.filter || !e_acc || !FilterOnEdge(dba, acc, *e_acc, {*req.filter})) { - return false; + if (!e_acc) { + return true; + } + + if (req.filter && !FilterOnEdge(dba, acc, *e_acc, {*req.filter})) { + return true; } edges.push_back(*e_acc); - return true; + return false; }; // Handler logic here @@ -673,6 +681,7 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::GetPropertiesRequest &&req) { return collect_response(vertices, [emplace_result_row](auto &acc) mutable { return emplace_result_row(acc, std::nullopt); }); } + if (!req.order_by.empty()) { auto elements = OrderByEdges(dba, edges, req.order_by, vertices); return collect_response(elements, [emplace_result_row](auto &element) mutable { diff --git a/tests/simulation/shard_rsm.cpp b/tests/simulation/shard_rsm.cpp index 80a5cd87d..ba36157d0 100644 --- a/tests/simulation/shard_rsm.cpp +++ b/tests/simulation/shard_rsm.cpp @@ -489,7 +489,9 @@ msgs::GetPropertiesResponse AttemptToGetProperties(ShardClient &client, std::vec std::optional<std::string> order_by = std::nullopt) { msgs::GetPropertiesRequest req{}; req.transaction_id.logical_id = GetTransactionId(); - req.property_ids = std::move(properties); + if (!properties.empty()) { + req.property_ids = std::move(properties); + } if (filter_prop) { std::string filter_expr = (!edge) ? "MG_SYMBOL_NODE.prop1 >= " : "MG_SYMBOL_EDGE.e_prop = "; @@ -1294,13 +1296,15 @@ void TestGetProperties(ShardClient &client) { const auto prop_id_5 = PropertyId::FromUint(5); // Vertices { - // No properties - const auto result = AttemptToGetProperties(client, {}, {v_id, v_id_2}, {}, std::nullopt, unique_prop_val_2); + const auto result = AttemptToGetProperties(client, {}, {v_id, v_id_2}, {}); MG_ASSERT(!result.error); - MG_ASSERT(result.result_row.empty()); + MG_ASSERT(result.result_row.size() == 2); + for (const auto &elem : result.result_row) { + MG_ASSERT(elem.props.size() == 3); + } } { - // All properties + // Specific properties const auto result = AttemptToGetProperties(client, {prop_id_2, prop_id_4, prop_id_5}, {v_id, v_id_2, v_id_3}, {}); MG_ASSERT(!result.error); MG_ASSERT(!result.result_row.empty()); @@ -1393,11 +1397,14 @@ void TestGetProperties(ShardClient &client) { unique_edge_prop_id, edge_prop_val_2, {edge_type_id})); const auto edge_prop_id = PropertyId::FromUint(unique_edge_prop_id); std::vector<msgs::EdgeId> edge_ids = {{edge_gid}, {edge_gid_2}}; - // no properties + // all properties { const auto result = AttemptToGetProperties(client, {}, {v_id_2, v_id_3}, edge_ids); MG_ASSERT(!result.error); - MG_ASSERT(result.result_row.empty()); + MG_ASSERT(result.result_row.size() == 2); + for (const auto &elem : result.result_row) { + MG_ASSERT(elem.props.size() == 1); + } } // properties for two vertices { From 8af635c8d70efce8482a5d8bd0c8174d76270b6f Mon Sep 17 00:00:00 2001 From: Kostas Kyrimis <kostaskyrim@gmail.com> Date: Wed, 30 Nov 2022 17:44:37 +0200 Subject: [PATCH 5/9] Fix clang-tidy warnings --- src/storage/v3/shard_rsm.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/storage/v3/shard_rsm.cpp b/src/storage/v3/shard_rsm.cpp index 4ea874770..e533008a1 100644 --- a/src/storage/v3/shard_rsm.cpp +++ b/src/storage/v3/shard_rsm.cpp @@ -533,7 +533,7 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::GetPropertiesRequest &&req) { std::vector<std::pair<PropertyId, Value>> result; result.reserve(value.size()); for (auto &[id, val] : value) { - result.push_back({id, std::move(val)}); + result.emplace_back(std::make_pair(id, std::move(val))); } return result; }; From f8cbaaf362237da45683dcd5b35194ef63538047 Mon Sep 17 00:00:00 2001 From: Kostas Kyrimis <kostaskyrim@gmail.com> Date: Thu, 1 Dec 2022 14:41:21 +0200 Subject: [PATCH 6/9] Allow requests with zero properties --- src/storage/v3/shard_rsm.cpp | 3 -- tests/simulation/shard_rsm.cpp | 77 ++++++++++++++++++++-------------- 2 files changed, 46 insertions(+), 34 deletions(-) diff --git a/src/storage/v3/shard_rsm.cpp b/src/storage/v3/shard_rsm.cpp index e533008a1..639ffd6d8 100644 --- a/src/storage/v3/shard_rsm.cpp +++ b/src/storage/v3/shard_rsm.cpp @@ -521,9 +521,6 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::GetPropertiesRequest &&req) { auto error = CreateErrorResponse(shard_error, req.transaction_id, ""); return msgs::GetPropertiesResponse{.error = {}}; } - if (req.property_ids && req.property_ids->empty()) { - return {}; - } auto shard_acc = shard_->Access(req.transaction_id); auto dba = DbAccessor{&shard_acc}; diff --git a/tests/simulation/shard_rsm.cpp b/tests/simulation/shard_rsm.cpp index ba36157d0..768217945 100644 --- a/tests/simulation/shard_rsm.cpp +++ b/tests/simulation/shard_rsm.cpp @@ -480,18 +480,14 @@ std::tuple<size_t, std::optional<msgs::VertexId>> AttemptToScanAllWithExpression } } -msgs::GetPropertiesResponse AttemptToGetProperties(ShardClient &client, std::vector<PropertyId> properties, - std::vector<msgs::VertexId> vertices, - std::vector<msgs::EdgeId> edges, - std::optional<size_t> limit = std::nullopt, - std::optional<uint64_t> filter_prop = std::nullopt, - bool edge = false, - std::optional<std::string> order_by = std::nullopt) { +msgs::GetPropertiesResponse AttemptToGetProperties( + ShardClient &client, std::optional<std::vector<PropertyId>> properties, std::vector<msgs::VertexId> vertices, + std::vector<msgs::EdgeId> edges, std::optional<size_t> limit = std::nullopt, + std::optional<uint64_t> filter_prop = std::nullopt, bool edge = false, + std::optional<std::string> order_by = std::nullopt) { msgs::GetPropertiesRequest req{}; req.transaction_id.logical_id = GetTransactionId(); - if (!properties.empty()) { - req.property_ids = std::move(properties); - } + req.property_ids = std::move(properties); if (filter_prop) { std::string filter_expr = (!edge) ? "MG_SYMBOL_NODE.prop1 >= " : "MG_SYMBOL_EDGE.e_prop = "; @@ -1294,9 +1290,18 @@ void TestGetProperties(ShardClient &client) { const auto prop_id_2 = PropertyId::FromUint(2); const auto prop_id_4 = PropertyId::FromUint(4); const auto prop_id_5 = PropertyId::FromUint(5); - // Vertices + // No properties { - const auto result = AttemptToGetProperties(client, {}, {v_id, v_id_2}, {}); + const auto result = AttemptToGetProperties(client, {{}}, {v_id, v_id_2}, {}); + MG_ASSERT(!result.error); + MG_ASSERT(result.result_row.size() == 2); + for (const auto &elem : result.result_row) { + MG_ASSERT(elem.props.size() == 0); + } + } + // All properties + { + const auto result = AttemptToGetProperties(client, std::nullopt, {v_id, v_id_2}, {}); MG_ASSERT(!result.error); MG_ASSERT(result.result_row.size() == 2); for (const auto &elem : result.result_row) { @@ -1305,7 +1310,8 @@ void TestGetProperties(ShardClient &client) { } { // Specific properties - const auto result = AttemptToGetProperties(client, {prop_id_2, prop_id_4, prop_id_5}, {v_id, v_id_2, v_id_3}, {}); + const auto result = + AttemptToGetProperties(client, std::vector{prop_id_2, prop_id_4, prop_id_5}, {v_id, v_id_2, v_id_3}, {}); MG_ASSERT(!result.error); MG_ASSERT(!result.result_row.empty()); MG_ASSERT(result.result_row.size() == 3); @@ -1315,14 +1321,14 @@ void TestGetProperties(ShardClient &client) { } { // Two properties from two vertices with a filter on unique_prop_5 - const auto result = AttemptToGetProperties(client, {prop_id_2, prop_id_4}, {v_id, v_id_2, v_id_5}, {}, std::nullopt, - unique_prop_val_5); + const auto result = AttemptToGetProperties(client, std::vector{prop_id_2, prop_id_4}, {v_id, v_id_2, v_id_5}, {}, + std::nullopt, unique_prop_val_5); MG_ASSERT(!result.error); MG_ASSERT(result.result_row.size() == 1); } { // One property from three vertices. - const auto result = AttemptToGetProperties(client, {prop_id_2}, {v_id, v_id_2, v_id_3}, {}); + const auto result = AttemptToGetProperties(client, std::vector{prop_id_2}, {v_id, v_id_2, v_id_3}, {}); MG_ASSERT(!result.error); MG_ASSERT(result.result_row.size() == 3); MG_ASSERT(result.result_row[0].props.size() == 1); @@ -1331,21 +1337,21 @@ void TestGetProperties(ShardClient &client) { } { // Same as before but with limit of 1 row - const auto result = - AttemptToGetProperties(client, {prop_id_2}, {v_id, v_id_2, v_id_3}, {}, std::make_optional<size_t>(1)); + const auto result = AttemptToGetProperties(client, std::vector{prop_id_2}, {v_id, v_id_2, v_id_3}, {}, + std::make_optional<size_t>(1)); MG_ASSERT(!result.error); MG_ASSERT(result.result_row.size() == 1); } { // Same as before but with a limit greater than the elements returned - const auto result = - AttemptToGetProperties(client, {prop_id_2}, {v_id, v_id_2, v_id_3}, {}, std::make_optional<size_t>(5)); + const auto result = AttemptToGetProperties(client, std::vector{prop_id_2}, std::vector{v_id, v_id_2, v_id_3}, {}, + std::make_optional<size_t>(5)); MG_ASSERT(!result.error); MG_ASSERT(result.result_row.size() == 3); } { // Order by on `prop1` (descending) - const auto result = AttemptToGetProperties(client, {prop_id_2}, {v_id, v_id_2, v_id_3}, {}, std::nullopt, + const auto result = AttemptToGetProperties(client, std::vector{prop_id_2}, {v_id, v_id_2, v_id_3}, {}, std::nullopt, std::nullopt, false, "prop1"); MG_ASSERT(!result.error); MG_ASSERT(result.result_row.size() == 3); @@ -1355,8 +1361,8 @@ void TestGetProperties(ShardClient &client) { } { // Order by and filter on >= unique_prop_val_3 && assert result row data members - const auto result = AttemptToGetProperties(client, {prop_id_2}, {v_id, v_id_2, v_id_3, v_id_4, v_id_5}, {}, - std::nullopt, unique_prop_val_3, false, "prop1"); + const auto result = AttemptToGetProperties(client, std::vector{prop_id_2}, {v_id, v_id_2, v_id_3, v_id_4, v_id_5}, + {}, std::nullopt, unique_prop_val_3, false, "prop1"); MG_ASSERT(!result.error); MG_ASSERT(result.result_row.size() == 3); MG_ASSERT(result.result_row[0].vertex == v_id_5); @@ -1397,25 +1403,34 @@ void TestGetProperties(ShardClient &client) { unique_edge_prop_id, edge_prop_val_2, {edge_type_id})); const auto edge_prop_id = PropertyId::FromUint(unique_edge_prop_id); std::vector<msgs::EdgeId> edge_ids = {{edge_gid}, {edge_gid_2}}; - // all properties + // No properties { - const auto result = AttemptToGetProperties(client, {}, {v_id_2, v_id_3}, edge_ids); + const auto result = AttemptToGetProperties(client, {{}}, {v_id_2, v_id_3}, edge_ids); + MG_ASSERT(!result.error); + MG_ASSERT(result.result_row.size() == 2); + for (const auto &elem : result.result_row) { + MG_ASSERT(elem.props.size() == 0); + } + } + // All properties + { + const auto result = AttemptToGetProperties(client, std::nullopt, {v_id_2, v_id_3}, edge_ids); MG_ASSERT(!result.error); MG_ASSERT(result.result_row.size() == 2); for (const auto &elem : result.result_row) { MG_ASSERT(elem.props.size() == 1); } } - // properties for two vertices + // Properties for two vertices { - const auto result = AttemptToGetProperties(client, {edge_prop_id}, {v_id_2, v_id_3}, edge_ids); + const auto result = AttemptToGetProperties(client, std::vector{edge_prop_id}, {v_id_2, v_id_3}, edge_ids); MG_ASSERT(!result.error); MG_ASSERT(result.result_row.size() == 2); } - // filter + // Filter { - const auto result = - AttemptToGetProperties(client, {edge_prop_id}, {v_id_2, v_id_3}, edge_ids, {}, {edge_prop_val}, true); + const auto result = AttemptToGetProperties(client, std::vector{edge_prop_id}, {v_id_2, v_id_3}, edge_ids, {}, + {edge_prop_val}, true); MG_ASSERT(!result.error); MG_ASSERT(result.result_row.size() == 1); MG_ASSERT(result.result_row.front().edge); @@ -1426,7 +1441,7 @@ void TestGetProperties(ShardClient &client) { // Order by { const auto result = - AttemptToGetProperties(client, {edge_prop_id}, {v_id_2, v_id_3}, edge_ids, {}, {}, true, "e_prop"); + AttemptToGetProperties(client, std::vector{edge_prop_id}, {v_id_2, v_id_3}, edge_ids, {}, {}, true, "e_prop"); MG_ASSERT(!result.error); MG_ASSERT(result.result_row.size() == 2); MG_ASSERT(result.result_row[0].vertex == v_id_3); From 2120645d6a2a7802b76d2dbb8a1fbaa81b53019c Mon Sep 17 00:00:00 2001 From: Kostas Kyrimis <kostaskyrim@gmail.com> Date: Thu, 1 Dec 2022 17:38:24 +0200 Subject: [PATCH 7/9] Remove dead code in request_router simulation test --- tests/simulation/request_router.cpp | 7 ------- 1 file changed, 7 deletions(-) diff --git a/tests/simulation/request_router.cpp b/tests/simulation/request_router.cpp index 526e6365b..e990c867e 100644 --- a/tests/simulation/request_router.cpp +++ b/tests/simulation/request_router.cpp @@ -160,13 +160,6 @@ void TestScanVertices(query::v2::RequestRouterInterface &request_router) { prop = result[1].GetProperty(msgs::PropertyId::FromUint(0)); MG_ASSERT(prop.int_v == 444); } - - // result = request_router.ScanVertices("test_label"); - // { - // MG_ASSERT(result.size() == 1); - // auto prop = result[0].GetProperty(msgs::PropertyId::FromUint(0)); - // MG_ASSERT(prop.int_v == 1); - // } } void TestCreateVertices(query::v2::RequestRouterInterface &request_router) { From c15e75b48cea40e980706a44647753842e63fb61 Mon Sep 17 00:00:00 2001 From: Kostas Kyrimis <kostaskyrim@gmail.com> Date: Thu, 1 Dec 2022 17:40:58 +0200 Subject: [PATCH 8/9] Remove old shard request manager header --- src/query/v2/shard_request_manager.hpp | 812 ------------------------- 1 file changed, 812 deletions(-) delete mode 100644 src/query/v2/shard_request_manager.hpp diff --git a/src/query/v2/shard_request_manager.hpp b/src/query/v2/shard_request_manager.hpp deleted file mode 100644 index 4db77e645..000000000 --- a/src/query/v2/shard_request_manager.hpp +++ /dev/null @@ -1,812 +0,0 @@ -// Copyright 2022 Memgraph Ltd. -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source -// License, and you may not use this file except in compliance with the Business Source License. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -#pragma once - -#include <chrono> -#include <deque> -#include <iostream> -#include <iterator> -#include <map> -#include <numeric> -#include <optional> -#include <random> -#include <set> -#include <stdexcept> -#include <thread> -#include <unordered_map> -#include <vector> - -#include "coordinator/coordinator.hpp" -#include "coordinator/coordinator_client.hpp" -#include "coordinator/coordinator_rsm.hpp" -#include "coordinator/shard_map.hpp" -#include "io/address.hpp" -#include "io/errors.hpp" -#include "io/rsm/raft.hpp" -#include "io/rsm/rsm_client.hpp" -#include "io/rsm/shard_rsm.hpp" -#include "io/simulator/simulator.hpp" -#include "io/simulator/simulator_transport.hpp" -#include "query/v2/accessors.hpp" -#include "query/v2/requests.hpp" -#include "storage/v3/id_types.hpp" -#include "storage/v3/value_conversions.hpp" -#include "utils/result.hpp" - -namespace memgraph::msgs { -template <typename TStorageClient> -class RsmStorageClientManager { - public: - using CompoundKey = memgraph::io::rsm::ShardRsmKey; - using Shard = memgraph::coordinator::Shard; - using LabelId = memgraph::storage::v3::LabelId; - RsmStorageClientManager() = default; - RsmStorageClientManager(const RsmStorageClientManager &) = delete; - RsmStorageClientManager(RsmStorageClientManager &&) = delete; - RsmStorageClientManager &operator=(const RsmStorageClientManager &) = delete; - RsmStorageClientManager &operator=(RsmStorageClientManager &&) = delete; - ~RsmStorageClientManager() = default; - - void AddClient(Shard key, TStorageClient client) { cli_cache_.emplace(std::move(key), std::move(client)); } - - bool Exists(const Shard &key) { return cli_cache_.contains(key); } - - void PurgeCache() { cli_cache_.clear(); } - - TStorageClient &GetClient(const Shard &key) { - auto it = cli_cache_.find(key); - MG_ASSERT(it != cli_cache_.end(), "Non-existing shard client"); - return it->second; - } - - private: - std::map<Shard, TStorageClient> cli_cache_; -}; - -template <typename TRequest> -struct ExecutionState { - using CompoundKey = memgraph::io::rsm::ShardRsmKey; - using Shard = memgraph::coordinator::Shard; - - enum State : int8_t { INITIALIZING, EXECUTING, COMPLETED }; - // label is optional because some operators can create/remove etc, vertices. These kind of requests contain the label - // on the request itself. - std::optional<std::string> label; - // CompoundKey is optional because some operators require to iterate over all the available keys - // of a shard. One example is ScanAll, where we only require the field label. - std::optional<CompoundKey> key; - // Transaction id to be filled by the ShardRequestManager implementation - memgraph::coordinator::Hlc transaction_id; - // Initialized by ShardRequestManager implementation. This vector is filled with the shards that - // the ShardRequestManager impl will send requests to. When a request to a shard exhausts it, meaning that - // it pulled all the requested data from the given Shard, it will be removed from the Vector. When the Vector becomes - // empty, it means that all of the requests have completed succefully. - // TODO(gvolfing) - // Maybe make this into a more complex object to be able to keep track of paginated resutls. E.g. instead of a vector - // of Shards make it into a std::vector<std::pair<Shard, PaginatedResultType>> (probably a struct instead of a pair) - // where PaginatedResultType is an enum signaling the progress on the given request. This way we can easily check if - // a partial response on a shard(if there is one) is finished and we can send off the request for the next batch. - std::vector<Shard> shard_cache; - // 1-1 mapping with `shard_cache`. - // A vector that tracks request metadata for each shard (For example, next_id for a ScanAll on Shard A) - std::vector<TRequest> requests; - State state = INITIALIZING; -}; - -class ShardRequestManagerInterface { - public: - using VertexAccessor = memgraph::query::v2::accessors::VertexAccessor; - using EdgeAccessor = memgraph::query::v2::accessors::EdgeAccessor; - ShardRequestManagerInterface() = default; - ShardRequestManagerInterface(const ShardRequestManagerInterface &) = delete; - ShardRequestManagerInterface(ShardRequestManagerInterface &&) = delete; - ShardRequestManagerInterface &operator=(const ShardRequestManagerInterface &) = delete; - ShardRequestManagerInterface &&operator=(ShardRequestManagerInterface &&) = delete; - - virtual ~ShardRequestManagerInterface() = default; - - virtual void StartTransaction() = 0; - virtual void Commit() = 0; - virtual std::vector<VertexAccessor> Request(ExecutionState<ScanVerticesRequest> &state) = 0; - virtual std::vector<CreateVerticesResponse> Request(ExecutionState<CreateVerticesRequest> &state, - std::vector<NewVertex> new_vertices) = 0; - virtual std::vector<ExpandOneResultRow> Request(ExecutionState<ExpandOneRequest> &state, - ExpandOneRequest request) = 0; - virtual std::vector<CreateExpandResponse> Request(ExecutionState<CreateExpandRequest> &state, - std::vector<NewExpand> new_edges) = 0; - virtual std::vector<GetPropertiesResponse> Request(ExecutionState<GetPropertiesRequest> &state, - GetPropertiesRequest request) = 0; - virtual storage::v3::EdgeTypeId NameToEdgeType(const std::string &name) const = 0; - virtual storage::v3::PropertyId NameToProperty(const std::string &name) const = 0; - virtual storage::v3::LabelId NameToLabel(const std::string &name) const = 0; - virtual const std::string &PropertyToName(memgraph::storage::v3::PropertyId prop) const = 0; - virtual const std::string &LabelToName(memgraph::storage::v3::LabelId label) const = 0; - virtual const std::string &EdgeTypeToName(memgraph::storage::v3::EdgeTypeId type) const = 0; - virtual bool IsPrimaryLabel(LabelId label) const = 0; - virtual bool IsPrimaryKey(LabelId primary_label, PropertyId property) const = 0; -}; - -// TODO(kostasrim)rename this class template -template <typename TTransport> -class ShardRequestManager : public ShardRequestManagerInterface { - public: - using StorageClient = - memgraph::coordinator::RsmClient<TTransport, WriteRequests, WriteResponses, ReadRequests, ReadResponses>; - using CoordinatorWriteRequests = memgraph::coordinator::CoordinatorWriteRequests; - using CoordinatorClient = memgraph::coordinator::CoordinatorClient<TTransport>; - using Address = memgraph::io::Address; - using Shard = memgraph::coordinator::Shard; - using ShardMap = memgraph::coordinator::ShardMap; - using CompoundKey = memgraph::coordinator::PrimaryKey; - using VertexAccessor = memgraph::query::v2::accessors::VertexAccessor; - using EdgeAccessor = memgraph::query::v2::accessors::EdgeAccessor; - ShardRequestManager(CoordinatorClient coord, memgraph::io::Io<TTransport> &&io) - : coord_cli_(std::move(coord)), io_(std::move(io)) {} - - ShardRequestManager(const ShardRequestManager &) = delete; - ShardRequestManager(ShardRequestManager &&) = delete; - ShardRequestManager &operator=(const ShardRequestManager &) = delete; - ShardRequestManager &operator=(ShardRequestManager &&) = delete; - - ~ShardRequestManager() override {} - - void StartTransaction() override { - memgraph::coordinator::HlcRequest req{.last_shard_map_version = shards_map_.GetHlc()}; - CoordinatorWriteRequests write_req = req; - auto write_res = coord_cli_.SendWriteRequest(write_req); - if (write_res.HasError()) { - throw std::runtime_error("HLC request failed"); - } - auto coordinator_write_response = write_res.GetValue(); - auto hlc_response = std::get<memgraph::coordinator::HlcResponse>(coordinator_write_response); - - // Transaction ID to be used later... - transaction_id_ = hlc_response.new_hlc; - - if (hlc_response.fresher_shard_map) { - shards_map_ = hlc_response.fresher_shard_map.value(); - SetUpNameIdMappers(); - } - } - - void Commit() override { - memgraph::coordinator::HlcRequest req{.last_shard_map_version = shards_map_.GetHlc()}; - CoordinatorWriteRequests write_req = req; - auto write_res = coord_cli_.SendWriteRequest(write_req); - if (write_res.HasError()) { - throw std::runtime_error("HLC request for commit failed"); - } - auto coordinator_write_response = write_res.GetValue(); - auto hlc_response = std::get<memgraph::coordinator::HlcResponse>(coordinator_write_response); - - if (hlc_response.fresher_shard_map) { - shards_map_ = hlc_response.fresher_shard_map.value(); - SetUpNameIdMappers(); - } - auto commit_timestamp = hlc_response.new_hlc; - - msgs::CommitRequest commit_req{.transaction_id = transaction_id_, .commit_timestamp = commit_timestamp}; - - for (const auto &[label, space] : shards_map_.label_spaces) { - for (const auto &[key, shard] : space.shards) { - auto &storage_client = GetStorageClientForShard(shard); - // TODO(kostasrim) Currently requests return the result directly. Adjust this when the API works MgFuture - // instead. - auto commit_response = storage_client.SendWriteRequest(commit_req); - // RETRY on timeouts? - // Sometimes this produces a timeout. Temporary solution is to use a while(true) as was done in shard_map test - if (commit_response.HasError()) { - throw std::runtime_error("Commit request timed out"); - } - WriteResponses write_response_variant = commit_response.GetValue(); - auto &response = std::get<CommitResponse>(write_response_variant); - if (response.error) { - throw std::runtime_error("Commit request did not succeed"); - } - } - } - } - - storage::v3::EdgeTypeId NameToEdgeType(const std::string &name) const override { - return shards_map_.GetEdgeTypeId(name).value(); - } - - storage::v3::PropertyId NameToProperty(const std::string &name) const override { - return shards_map_.GetPropertyId(name).value(); - } - - storage::v3::LabelId NameToLabel(const std::string &name) const override { - return shards_map_.GetLabelId(name).value(); - } - - const std::string &PropertyToName(memgraph::storage::v3::PropertyId id) const override { - return properties_.IdToName(id.AsUint()); - } - const std::string &LabelToName(memgraph::storage::v3::LabelId id) const override { - return labels_.IdToName(id.AsUint()); - } - const std::string &EdgeTypeToName(memgraph::storage::v3::EdgeTypeId id) const override { - return edge_types_.IdToName(id.AsUint()); - } - - bool IsPrimaryKey(LabelId primary_label, PropertyId property) const override { - const auto schema_it = shards_map_.schemas.find(primary_label); - MG_ASSERT(schema_it != shards_map_.schemas.end(), "Invalid primary label id: {}", primary_label.AsUint()); - - return std::find_if(schema_it->second.begin(), schema_it->second.end(), [property](const auto &schema_prop) { - return schema_prop.property_id == property; - }) != schema_it->second.end(); - } - - bool IsPrimaryLabel(LabelId label) const override { return shards_map_.label_spaces.contains(label); } - - // TODO(kostasrim) Simplify return result - std::vector<VertexAccessor> Request(ExecutionState<ScanVerticesRequest> &state) override { - MaybeInitializeExecutionState(state); - std::vector<ScanVerticesResponse> responses; - - SendAllRequests(state); - auto all_requests_gathered = [](auto &paginated_rsp_tracker) { - return std::ranges::all_of(paginated_rsp_tracker, [](const auto &state) { - return state.second == PaginatedResponseState::PartiallyFinished; - }); - }; - - std::map<Shard, PaginatedResponseState> paginated_response_tracker; - for (const auto &shard : state.shard_cache) { - paginated_response_tracker.insert(std::make_pair(shard, PaginatedResponseState::Pending)); - } - - do { - AwaitOnPaginatedRequests(state, responses, paginated_response_tracker); - } while (!all_requests_gathered(paginated_response_tracker)); - - MaybeCompleteState(state); - // TODO(kostasrim) Before returning start prefetching the batch (this shall be done once we get MgFuture as return - // result of storage_client.SendReadRequest()). - return PostProcess(std::move(responses)); - } - - std::vector<CreateVerticesResponse> Request(ExecutionState<CreateVerticesRequest> &state, - std::vector<NewVertex> new_vertices) override { - MG_ASSERT(!new_vertices.empty()); - MaybeInitializeExecutionState(state, new_vertices); - std::vector<CreateVerticesResponse> responses; - auto &shard_cache_ref = state.shard_cache; - - // 1. Send the requests. - SendAllRequests(state, shard_cache_ref); - - // 2. Block untill all the futures are exhausted - do { - AwaitOnResponses(state, responses); - } while (!state.shard_cache.empty()); - - MaybeCompleteState(state); - // TODO(kostasrim) Before returning start prefetching the batch (this shall be done once we get MgFuture as return - // result of storage_client.SendReadRequest()). - return responses; - } - - std::vector<CreateExpandResponse> Request(ExecutionState<CreateExpandRequest> &state, - std::vector<NewExpand> new_edges) override { - MG_ASSERT(!new_edges.empty()); - MaybeInitializeExecutionState(state, new_edges); - std::vector<CreateExpandResponse> responses; - auto &shard_cache_ref = state.shard_cache; - size_t id{0}; - for (auto shard_it = shard_cache_ref.begin(); shard_it != shard_cache_ref.end(); ++id) { - auto &storage_client = GetStorageClientForShard(*shard_it); - WriteRequests req = state.requests[id]; - auto write_response_result = storage_client.SendWriteRequest(std::move(req)); - if (write_response_result.HasError()) { - throw std::runtime_error("CreateVertices request timedout"); - } - WriteResponses response_variant = write_response_result.GetValue(); - CreateExpandResponse mapped_response = std::get<CreateExpandResponse>(response_variant); - - if (mapped_response.error) { - throw std::runtime_error("CreateExpand request did not succeed"); - } - responses.push_back(mapped_response); - shard_it = shard_cache_ref.erase(shard_it); - } - // We are done with this state - MaybeCompleteState(state); - return responses; - } - - std::vector<ExpandOneResultRow> Request(ExecutionState<ExpandOneRequest> &state, ExpandOneRequest request) override { - // TODO(kostasrim)Update to limit the batch size here - // Expansions of the destination must be handled by the caller. For example - // match (u:L1 { prop : 1 })-[:Friend]-(v:L1) - // For each vertex U, the ExpandOne will result in <U, Edges>. The destination vertex and its properties - // must be fetched again with an ExpandOne(Edges.dst) - MaybeInitializeExecutionState(state, std::move(request)); - std::vector<ExpandOneResponse> responses; - auto &shard_cache_ref = state.shard_cache; - - // 1. Send the requests. - SendAllRequests(state, shard_cache_ref); - - // 2. Block untill all the futures are exhausted - do { - AwaitOnResponses(state, responses); - } while (!state.shard_cache.empty()); - std::vector<ExpandOneResultRow> result_rows; - const auto total_row_count = std::accumulate( - responses.begin(), responses.end(), 0, - [](const int64_t partial_count, const ExpandOneResponse &resp) { return partial_count + resp.result.size(); }); - result_rows.reserve(total_row_count); - - for (auto &response : responses) { - result_rows.insert(result_rows.end(), std::make_move_iterator(response.result.begin()), - std::make_move_iterator(response.result.end())); - } - MaybeCompleteState(state); - return result_rows; - } - - std::vector<GetPropertiesResponse> Request(ExecutionState<GetPropertiesRequest> &state, - GetPropertiesRequest requests) override { - MaybeInitializeExecutionState(state, std::move(requests)); - SendAllRequests(state); - - std::vector<GetPropertiesResponse> responses; - // 2. Block untill all the futures are exhausted - do { - AwaitOnResponses(state, responses); - } while (!state.shard_cache.empty()); - - MaybeCompleteState(state); - return responses; - } - - private: - enum class PaginatedResponseState { Pending, PartiallyFinished }; - - std::vector<VertexAccessor> PostProcess(std::vector<ScanVerticesResponse> &&responses) const { - std::vector<VertexAccessor> accessors; - for (auto &response : responses) { - for (auto &result_row : response.results) { - accessors.emplace_back(VertexAccessor(std::move(result_row.vertex), std::move(result_row.props), this)); - } - } - return accessors; - } - - template <typename ExecutionState> - void ThrowIfStateCompleted(ExecutionState &state) const { - if (state.state == ExecutionState::COMPLETED) [[unlikely]] { - throw std::runtime_error("State is completed and must be reset"); - } - } - - template <typename ExecutionState> - void ThrowIfStateExecuting(ExecutionState &state) const { - if (state.state == ExecutionState::EXECUTING) [[unlikely]] { - throw std::runtime_error("State is completed and must be reset"); - } - } - - template <typename ExecutionState> - void MaybeCompleteState(ExecutionState &state) const { - if (state.requests.empty()) { - state.state = ExecutionState::COMPLETED; - } - } - - template <typename ExecutionState> - bool ShallNotInitializeState(ExecutionState &state) const { - return state.state != ExecutionState::INITIALIZING; - } - - void MaybeInitializeExecutionState(ExecutionState<CreateVerticesRequest> &state, - std::vector<NewVertex> new_vertices) { - ThrowIfStateCompleted(state); - if (ShallNotInitializeState(state)) { - return; - } - state.transaction_id = transaction_id_; - - std::map<Shard, CreateVerticesRequest> per_shard_request_table; - - for (auto &new_vertex : new_vertices) { - MG_ASSERT(!new_vertex.label_ids.empty(), "This is error!"); - auto shard = shards_map_.GetShardForKey(new_vertex.label_ids[0].id, - storage::conversions::ConvertPropertyVector(new_vertex.primary_key)); - if (!per_shard_request_table.contains(shard)) { - CreateVerticesRequest create_v_rqst{.transaction_id = transaction_id_}; - per_shard_request_table.insert(std::pair(shard, std::move(create_v_rqst))); - state.shard_cache.push_back(shard); - } - per_shard_request_table[shard].new_vertices.push_back(std::move(new_vertex)); - } - - for (auto &[shard, rqst] : per_shard_request_table) { - state.requests.push_back(std::move(rqst)); - } - state.state = ExecutionState<CreateVerticesRequest>::EXECUTING; - } - - void MaybeInitializeExecutionState(ExecutionState<CreateExpandRequest> &state, std::vector<NewExpand> new_expands) { - ThrowIfStateCompleted(state); - if (ShallNotInitializeState(state)) { - return; - } - state.transaction_id = transaction_id_; - - std::map<Shard, CreateExpandRequest> per_shard_request_table; - auto ensure_shard_exists_in_table = [&per_shard_request_table, - transaction_id = transaction_id_](const Shard &shard) { - if (!per_shard_request_table.contains(shard)) { - CreateExpandRequest create_expand_request{.transaction_id = transaction_id}; - per_shard_request_table.insert({shard, std::move(create_expand_request)}); - } - }; - - for (auto &new_expand : new_expands) { - const auto shard_src_vertex = shards_map_.GetShardForKey( - new_expand.src_vertex.first.id, storage::conversions::ConvertPropertyVector(new_expand.src_vertex.second)); - const auto shard_dest_vertex = shards_map_.GetShardForKey( - new_expand.dest_vertex.first.id, storage::conversions::ConvertPropertyVector(new_expand.dest_vertex.second)); - - ensure_shard_exists_in_table(shard_src_vertex); - - if (shard_src_vertex != shard_dest_vertex) { - ensure_shard_exists_in_table(shard_dest_vertex); - per_shard_request_table[shard_dest_vertex].new_expands.push_back(new_expand); - } - per_shard_request_table[shard_src_vertex].new_expands.push_back(std::move(new_expand)); - } - - for (auto &[shard, request] : per_shard_request_table) { - state.shard_cache.push_back(shard); - state.requests.push_back(std::move(request)); - } - state.state = ExecutionState<CreateExpandRequest>::EXECUTING; - } - - void MaybeInitializeExecutionState(ExecutionState<ScanVerticesRequest> &state) { - ThrowIfStateCompleted(state); - if (ShallNotInitializeState(state)) { - return; - } - - std::vector<coordinator::Shards> multi_shards; - state.transaction_id = transaction_id_; - if (!state.label) { - multi_shards = shards_map_.GetAllShards(); - } else { - const auto label_id = shards_map_.GetLabelId(*state.label); - MG_ASSERT(label_id); - MG_ASSERT(IsPrimaryLabel(*label_id)); - multi_shards = {shards_map_.GetShardsForLabel(*state.label)}; - } - for (auto &shards : multi_shards) { - for (auto &[key, shard] : shards) { - MG_ASSERT(!shard.empty()); - state.shard_cache.push_back(std::move(shard)); - ScanVerticesRequest rqst; - rqst.transaction_id = transaction_id_; - rqst.start_id.second = storage::conversions::ConvertValueVector(key); - state.requests.push_back(std::move(rqst)); - } - } - state.state = ExecutionState<ScanVerticesRequest>::EXECUTING; - } - - void MaybeInitializeExecutionState(ExecutionState<ExpandOneRequest> &state, ExpandOneRequest request) { - ThrowIfStateCompleted(state); - if (ShallNotInitializeState(state)) { - return; - } - state.transaction_id = transaction_id_; - - std::map<Shard, ExpandOneRequest> per_shard_request_table; - auto top_level_rqst_template = request; - top_level_rqst_template.transaction_id = transaction_id_; - top_level_rqst_template.src_vertices.clear(); - state.requests.clear(); - for (auto &vertex : request.src_vertices) { - auto shard = - shards_map_.GetShardForKey(vertex.first.id, storage::conversions::ConvertPropertyVector(vertex.second)); - if (!per_shard_request_table.contains(shard)) { - per_shard_request_table.insert(std::pair(shard, top_level_rqst_template)); - state.shard_cache.push_back(shard); - } - per_shard_request_table[shard].src_vertices.push_back(vertex); - } - - for (auto &[shard, rqst] : per_shard_request_table) { - state.requests.push_back(std::move(rqst)); - } - state.state = ExecutionState<ExpandOneRequest>::EXECUTING; - } - - void MaybeInitializeExecutionState(ExecutionState<GetPropertiesRequest> &state, GetPropertiesRequest request) { - ThrowIfStateCompleted(state); - ThrowIfStateExecuting(state); - - std::map<Shard, GetPropertiesRequest> per_shard_request_table; - auto top_level_rqst_template = request; - top_level_rqst_template.transaction_id = transaction_id_; - top_level_rqst_template.vertices_and_edges.clear(); - - state.transaction_id = transaction_id_; - - for (auto &[vertex, maybe_edge] : request.vertices_and_edges) { - auto shard = - shards_map_.GetShardForKey(vertex.first.id, storage::conversions::ConvertPropertyVector(vertex.second)); - if (!per_shard_request_table.contains(shard)) { - per_shard_request_table.insert(std::pair(shard, top_level_rqst_template)); - state.shard_cache.push_back(shard); - } - per_shard_request_table[shard].vertices_and_edges.push_back({std::move(vertex), maybe_edge}); - } - - for (auto &[shard, rqst] : per_shard_request_table) { - state.requests.push_back(std::move(rqst)); - } - state.state = ExecutionState<GetPropertiesRequest>::EXECUTING; - } - - StorageClient &GetStorageClientForShard(Shard shard) { - if (!storage_cli_manager_.Exists(shard)) { - AddStorageClientToManager(shard); - } - return storage_cli_manager_.GetClient(shard); - } - - StorageClient &GetStorageClientForShard(const std::string &label, const CompoundKey &key) { - auto shard = shards_map_.GetShardForKey(label, key); - return GetStorageClientForShard(std::move(shard)); - } - - void AddStorageClientToManager(Shard target_shard) { - MG_ASSERT(!target_shard.empty()); - auto leader_addr = target_shard.front(); - std::vector<Address> addresses; - addresses.reserve(target_shard.size()); - for (auto &address : target_shard) { - addresses.push_back(std::move(address.address)); - } - auto cli = StorageClient(io_, std::move(leader_addr.address), std::move(addresses)); - storage_cli_manager_.AddClient(target_shard, std::move(cli)); - } - - template <typename TRequest> - void SendAllRequests(ExecutionState<TRequest> &state) { - int64_t shard_idx = 0; - for (const auto &request : state.requests) { - const auto ¤t_shard = state.shard_cache[shard_idx]; - - auto &storage_client = GetStorageClientForShard(current_shard); - ReadRequests req = request; - storage_client.SendAsyncReadRequest(request); - - ++shard_idx; - } - } - - void SendAllRequests(ExecutionState<CreateVerticesRequest> &state, - std::vector<memgraph::coordinator::Shard> &shard_cache_ref) { - size_t id = 0; - for (auto shard_it = shard_cache_ref.begin(); shard_it != shard_cache_ref.end(); ++shard_it) { - // This is fine because all new_vertices of each request end up on the same shard - const auto labels = state.requests[id].new_vertices[0].label_ids; - auto req_deep_copy = state.requests[id]; - - for (auto &new_vertex : req_deep_copy.new_vertices) { - new_vertex.label_ids.erase(new_vertex.label_ids.begin()); - } - - auto &storage_client = GetStorageClientForShard(*shard_it); - - WriteRequests req = req_deep_copy; - storage_client.SendAsyncWriteRequest(req); - ++id; - } - } - - void SendAllRequests(ExecutionState<ExpandOneRequest> &state, - std::vector<memgraph::coordinator::Shard> &shard_cache_ref) { - size_t id = 0; - for (auto shard_it = shard_cache_ref.begin(); shard_it != shard_cache_ref.end(); ++shard_it) { - auto &storage_client = GetStorageClientForShard(*shard_it); - ReadRequests req = state.requests[id]; - storage_client.SendAsyncReadRequest(req); - ++id; - } - } - - void AwaitOnResponses(ExecutionState<CreateVerticesRequest> &state, std::vector<CreateVerticesResponse> &responses) { - auto &shard_cache_ref = state.shard_cache; - int64_t request_idx = 0; - - for (auto shard_it = shard_cache_ref.begin(); shard_it != shard_cache_ref.end();) { - auto &storage_client = GetStorageClientForShard(*shard_it); - - auto poll_result = storage_client.AwaitAsyncWriteRequest(); - if (!poll_result) { - ++shard_it; - ++request_idx; - - continue; - } - - if (poll_result->HasError()) { - throw std::runtime_error("CreateVertices request timed out"); - } - - WriteResponses response_variant = poll_result->GetValue(); - auto response = std::get<CreateVerticesResponse>(response_variant); - - if (response.error) { - throw std::runtime_error("CreateVertices request did not succeed"); - } - responses.push_back(response); - - shard_it = shard_cache_ref.erase(shard_it); - // Needed to maintain the 1-1 mapping between the ShardCache and the requests. - auto it = state.requests.begin() + request_idx; - state.requests.erase(it); - } - } - - void AwaitOnResponses(ExecutionState<ExpandOneRequest> &state, std::vector<ExpandOneResponse> &responses) { - auto &shard_cache_ref = state.shard_cache; - int64_t request_idx = 0; - - for (auto shard_it = shard_cache_ref.begin(); shard_it != shard_cache_ref.end();) { - auto &storage_client = GetStorageClientForShard(*shard_it); - - auto poll_result = storage_client.PollAsyncReadRequest(); - if (!poll_result) { - ++shard_it; - ++request_idx; - continue; - } - - if (poll_result->HasError()) { - throw std::runtime_error("ExpandOne request timed out"); - } - - ReadResponses response_variant = poll_result->GetValue(); - auto response = std::get<ExpandOneResponse>(response_variant); - // -NOTE- - // Currently a boolean flag for signaling the overall success of the - // ExpandOne request does not exist. But it should, so here we assume - // that it is already in place. - if (response.error) { - throw std::runtime_error("ExpandOne request did not succeed"); - } - - responses.push_back(std::move(response)); - shard_it = shard_cache_ref.erase(shard_it); - // Needed to maintain the 1-1 mapping between the ShardCache and the requests. - auto it = state.requests.begin() + request_idx; - state.requests.erase(it); - } - } - - void AwaitOnResponses(ExecutionState<GetPropertiesRequest> &state, std::vector<GetPropertiesResponse> &responses) { - auto &shard_cache_ref = state.shard_cache; - int64_t request_idx = 0; - - for (auto shard_it = shard_cache_ref.begin(); shard_it != shard_cache_ref.end();) { - auto &storage_client = GetStorageClientForShard(*shard_it); - - auto poll_result = storage_client.PollAsyncReadRequest(); - if (!poll_result) { - ++shard_it; - ++request_idx; - continue; - } - - if (poll_result->HasError()) { - throw std::runtime_error("GetProperties request timed out"); - } - - ReadResponses response_variant = poll_result->GetValue(); - auto response = std::get<GetPropertiesResponse>(response_variant); - if (response.result != GetPropertiesResponse::SUCCESS) { - throw std::runtime_error("GetProperties request did not succeed"); - } - - responses.push_back(std::move(response)); - shard_it = shard_cache_ref.erase(shard_it); - // Needed to maintain the 1-1 mapping between the ShardCache and the requests. - auto it = state.requests.begin() + request_idx; - state.requests.erase(it); - } - } - - void AwaitOnPaginatedRequests(ExecutionState<ScanVerticesRequest> &state, - std::vector<ScanVerticesResponse> &responses, - std::map<Shard, PaginatedResponseState> &paginated_response_tracker) { - auto &shard_cache_ref = state.shard_cache; - - // Find the first request that is not holding a paginated response. - int64_t request_idx = 0; - for (auto shard_it = shard_cache_ref.begin(); shard_it != shard_cache_ref.end();) { - if (paginated_response_tracker.at(*shard_it) != PaginatedResponseState::Pending) { - ++shard_it; - ++request_idx; - continue; - } - - auto &storage_client = GetStorageClientForShard(*shard_it); - - auto await_result = storage_client.AwaitAsyncReadRequest(); - - if (!await_result) { - // Redirection has occured. - ++shard_it; - ++request_idx; - continue; - } - - if (await_result->HasError()) { - throw std::runtime_error("ScanAll request timed out"); - } - - ReadResponses read_response_variant = await_result->GetValue(); - auto response = std::get<ScanVerticesResponse>(read_response_variant); - if (response.error) { - throw std::runtime_error("ScanAll request did not succeed"); - } - - if (!response.next_start_id) { - paginated_response_tracker.erase((*shard_it)); - shard_cache_ref.erase(shard_it); - // Needed to maintain the 1-1 mapping between the ShardCache and the requests. - auto it = state.requests.begin() + request_idx; - state.requests.erase(it); - - } else { - state.requests[request_idx].start_id.second = response.next_start_id->second; - paginated_response_tracker[*shard_it] = PaginatedResponseState::PartiallyFinished; - } - responses.push_back(std::move(response)); - } - } - - void SetUpNameIdMappers() { - std::unordered_map<uint64_t, std::string> id_to_name; - for (const auto &[name, id] : shards_map_.labels) { - id_to_name.emplace(id.AsUint(), name); - } - labels_.StoreMapping(std::move(id_to_name)); - id_to_name.clear(); - for (const auto &[name, id] : shards_map_.properties) { - id_to_name.emplace(id.AsUint(), name); - } - properties_.StoreMapping(std::move(id_to_name)); - id_to_name.clear(); - for (const auto &[name, id] : shards_map_.edge_types) { - id_to_name.emplace(id.AsUint(), name); - } - edge_types_.StoreMapping(std::move(id_to_name)); - } - - ShardMap shards_map_; - storage::v3::NameIdMapper properties_; - storage::v3::NameIdMapper edge_types_; - storage::v3::NameIdMapper labels_; - CoordinatorClient coord_cli_; - RsmStorageClientManager<StorageClient> storage_cli_manager_; - memgraph::io::Io<TTransport> io_; - memgraph::coordinator::Hlc transaction_id_; - // TODO(kostasrim) Add batch prefetching -}; -} // namespace memgraph::msgs From 0ad702175fc6217d76e3229f19722811c1f3020a Mon Sep 17 00:00:00 2001 From: Kostas Kyrimis <kostaskyrim@gmail.com> Date: Thu, 1 Dec 2022 18:24:51 +0200 Subject: [PATCH 9/9] Fix expression evaluator mocked request router --- tests/unit/query_v2_expression_evaluator.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/unit/query_v2_expression_evaluator.cpp b/tests/unit/query_v2_expression_evaluator.cpp index 5f77ed4e7..f819ee16f 100644 --- a/tests/unit/query_v2_expression_evaluator.cpp +++ b/tests/unit/query_v2_expression_evaluator.cpp @@ -51,6 +51,8 @@ using memgraph::msgs::CreateVerticesResponse; using memgraph::msgs::ExpandOneRequest; using memgraph::msgs::ExpandOneResponse; using memgraph::msgs::ExpandOneResultRow; +using memgraph::msgs::GetPropertiesRequest; +using memgraph::msgs::GetPropertiesResultRow; using memgraph::msgs::NewExpand; using memgraph::msgs::NewVertex; using memgraph::msgs::ScanVerticesRequest; @@ -92,6 +94,8 @@ class MockedRequestRouter : public RequestRouterInterface { std::vector<CreateExpandResponse> CreateExpand(std::vector<NewExpand> new_edges) override { return {}; } + std::vector<GetPropertiesResultRow> GetProperties(GetPropertiesRequest rqst) override { return {}; } + const std::string &PropertyToName(memgraph::storage::v3::PropertyId id) const override { return properties_.IdToName(id.AsUint()); }