diff --git a/src/query/v2/request_router.hpp b/src/query/v2/request_router.hpp index 996272fdc..da2c5bf59 100644 --- a/src/query/v2/request_router.hpp +++ b/src/query/v2/request_router.hpp @@ -11,6 +11,7 @@ #pragma once +#include #include #include #include @@ -112,6 +113,7 @@ class RequestRouterInterface { virtual std::vector CreateVertices(std::vector new_vertices) = 0; virtual std::vector ExpandOne(msgs::ExpandOneRequest request) = 0; virtual std::vector CreateExpand(std::vector new_edges) = 0; + virtual std::vector GetProperties(msgs::GetPropertiesRequest request) = 0; virtual storage::v3::EdgeTypeId NameToEdgeType(const std::string &name) const = 0; virtual storage::v3::PropertyId NameToProperty(const std::string &name) const = 0; @@ -367,6 +369,28 @@ class RequestRouter : public RequestRouterInterface { return result_rows; } + std::vector GetProperties(msgs::GetPropertiesRequest requests) override { + ExecutionState state = {}; + InitializeExecutionState(state, std::move(requests)); + for (auto &request : state.requests) { + auto &storage_client = GetStorageClientForShard(request.shard); + msgs::ReadRequests req = request.request; + request.async_request_token = storage_client.SendAsyncReadRequest(req); + } + + std::vector responses; + do { + DriveReadResponses(state, responses); + } while (!state.requests.empty()); + + std::vector result; + for (auto &res : responses) { + std::move(res.result_row.begin(), res.result_row.end(), std::back_inserter(result)); + } + + return result; + } + std::optional MaybeNameToProperty(const std::string &name) const override { return shards_map_.GetPropertyId(name); } @@ -503,6 +527,44 @@ class RequestRouter : public RequestRouterInterface { } } + void InitializeExecutionState(ExecutionState &state, msgs::GetPropertiesRequest request) { + std::map per_shard_request_table; + auto top_level_rqst_template = request; + top_level_rqst_template.transaction_id = transaction_id_; + top_level_rqst_template.vertex_ids.clear(); + top_level_rqst_template.vertices_and_edges.clear(); + + state.transaction_id = transaction_id_; + + for (auto &vertex : request.vertex_ids) { + auto shard = + shards_map_.GetShardForKey(vertex.first.id, storage::conversions::ConvertPropertyVector(vertex.second)); + if (!per_shard_request_table.contains(shard)) { + per_shard_request_table.insert(std::pair(shard, top_level_rqst_template)); + } + per_shard_request_table[shard].vertex_ids.emplace_back(std::move(vertex)); + } + + for (auto &[vertex, maybe_edge] : request.vertices_and_edges) { + auto shard = + shards_map_.GetShardForKey(vertex.first.id, storage::conversions::ConvertPropertyVector(vertex.second)); + if (!per_shard_request_table.contains(shard)) { + per_shard_request_table.insert(std::pair(shard, top_level_rqst_template)); + } + per_shard_request_table[shard].vertices_and_edges.emplace_back(std::move(vertex), maybe_edge); + } + + for (auto &[shard, rqst] : per_shard_request_table) { + ShardRequestState shard_request_state{ + .shard = shard, + .request = std::move(rqst), + .async_request_token = std::nullopt, + }; + + state.requests.emplace_back(std::move(shard_request_state)); + } + } + StorageClient &GetStorageClientForShard(Shard shard) { if (!storage_cli_manager_.Exists(shard)) { AddStorageClientToManager(shard); diff --git a/src/query/v2/requests.hpp b/src/query/v2/requests.hpp index 491d6cf72..9ff9a1bae 100644 --- a/src/query/v2/requests.hpp +++ b/src/query/v2/requests.hpp @@ -327,10 +327,6 @@ struct Expression { std::string expression; }; -struct Filter { - std::string filter_expression; -}; - enum class OrderingDirection { ASCENDING = 1, DESCENDING = 2 }; struct OrderBy { @@ -372,21 +368,32 @@ struct ScanVerticesResponse { std::vector results; }; -using VertexOrEdgeIds = std::variant; - struct GetPropertiesRequest { Hlc transaction_id; - // Shouldn't contain mixed vertex and edge ids - VertexOrEdgeIds vertex_or_edge_ids; - std::vector property_ids; - std::vector expressions; - bool only_unique = false; - std::optional> order_by; + std::vector vertex_ids; + std::vector> vertices_and_edges; + + std::optional> property_ids; + std::vector expressions; + + std::vector order_by; std::optional limit; - std::optional filter; + + // Return only the properties of the vertices or edges that the filter predicate + // evaluates to true + std::optional filter; +}; + +struct GetPropertiesResultRow { + VertexId vertex; + std::optional edge; + + std::vector> props; + std::vector evaluated_expressions; }; struct GetPropertiesResponse { + std::vector result_row; std::optional error; }; diff --git a/src/storage/v3/request_helper.cpp b/src/storage/v3/request_helper.cpp index 3b0c18326..6b889fe16 100644 --- a/src/storage/v3/request_helper.cpp +++ b/src/storage/v3/request_helper.cpp @@ -11,6 +11,7 @@ #include "storage/v3/request_helper.hpp" +#include #include #include "storage/v3/bindings/db_accessor.hpp" @@ -220,30 +221,39 @@ std::vector EvaluateVertexExpressions(DbAccessor &dba, const VertexA return evaluated_expressions; } +std::vector EvaluateEdgeExpressions(DbAccessor &dba, const VertexAccessor &v_acc, const EdgeAccessor &e_acc, + const std::vector &expressions) { + std::vector evaluated_expressions; + evaluated_expressions.reserve(expressions.size()); + + std::transform(expressions.begin(), expressions.end(), std::back_inserter(evaluated_expressions), + [&dba, &v_acc, &e_acc](const auto &expression) { + return ComputeExpression(dba, v_acc, e_acc, expression, expr::identifier_node_symbol, + expr::identifier_edge_symbol); + }); + + return evaluated_expressions; +} + ShardResult> CollectAllPropertiesFromAccessor(const VertexAccessor &acc, View view, const Schemas::Schema &schema) { - std::map ret; - auto props = acc.Properties(view); - if (props.HasError()) { - spdlog::debug("Encountered an error while trying to get vertex properties."); - return props.GetError(); + auto ret = impl::CollectAllPropertiesImpl(acc, view); + if (ret.HasError()) { + return ret.GetError(); } - auto &properties = props.GetValue(); - std::transform(properties.begin(), properties.end(), std::inserter(ret, ret.begin()), - [](std::pair &pair) { - return std::make_pair(pair.first, FromPropertyValueToValue(std::move(pair.second))); - }); - properties.clear(); - auto pks = PrimaryKeysFromAccessor(acc, view, schema); if (pks) { - ret.merge(*pks); + ret.GetValue().merge(std::move(*pks)); } return ret; } +ShardResult> CollectAllPropertiesFromAccessor(const VertexAccessor &acc, View view) { + return impl::CollectAllPropertiesImpl(acc, view); +} + EdgeUniquenessFunction InitializeEdgeUniquenessFunction(bool only_unique_neighbor_rows) { // Functions to select connecting edges based on uniquness EdgeUniquenessFunction maybe_filter_based_on_edge_uniquness; @@ -350,11 +360,20 @@ EdgeFiller InitializeEdgeFillerFunction(const msgs::ExpandOneRequest &req) { return edge_filler; } -bool FilterOnVertex(DbAccessor &dba, const storage::v3::VertexAccessor &v_acc, const std::vector &filters, - const std::string_view node_name) { - return std::ranges::all_of(filters, [&node_name, &dba, &v_acc](const auto &filter_expr) { - auto res = ComputeExpression(dba, v_acc, std::nullopt, filter_expr, node_name, ""); - return res.IsBool() && res.ValueBool(); +bool FilterOnVertex(DbAccessor &dba, const storage::v3::VertexAccessor &v_acc, + const std::vector &filters) { + return std::ranges::all_of(filters, [&dba, &v_acc](const auto &filter_expr) { + const auto result = ComputeExpression(dba, v_acc, std::nullopt, filter_expr, expr::identifier_node_symbol, ""); + return result.IsBool() && result.ValueBool(); + }); +} + +bool FilterOnEdge(DbAccessor &dba, const storage::v3::VertexAccessor &v_acc, const EdgeAccessor &e_acc, + const std::vector &filters) { + return std::ranges::all_of(filters, [&dba, &v_acc, &e_acc](const auto &filter_expr) { + const auto result = + ComputeExpression(dba, v_acc, e_acc, filter_expr, expr::identifier_node_symbol, expr::identifier_edge_symbol); + return result.IsBool() && result.ValueBool(); }); } @@ -526,4 +545,36 @@ std::vector> OrderByEdges(DbAccessor &dba, std::vector>> OrderByEdges( + DbAccessor &dba, std::vector &iterable, std::vector &order_by_edges, + const std::vector &vertex_acc) { + MG_ASSERT(vertex_acc.size() == iterable.size()); + std::vector ordering; + ordering.reserve(order_by_edges.size()); + std::transform(order_by_edges.begin(), order_by_edges.end(), std::back_inserter(ordering), + [](const auto &order_by) { return ConvertMsgsOrderByToOrdering(order_by.direction); }); + + std::vector>> ordered; + VertexAccessor current = vertex_acc.front(); + size_t id = 0; + for (auto it = iterable.begin(); it != iterable.end(); it++, id++) { + current = vertex_acc[id]; + std::vector properties_order_by; + properties_order_by.reserve(order_by_edges.size()); + std::transform(order_by_edges.begin(), order_by_edges.end(), std::back_inserter(properties_order_by), + [&dba, it, current](const auto &order_by) { + return ComputeExpression(dba, current, *it, order_by.expression.expression, + expr::identifier_node_symbol, expr::identifier_edge_symbol); + }); + + ordered.push_back({std::move(properties_order_by), {current, *it}}); + } + + auto compare_typed_values = TypedValueVectorCompare(ordering); + std::sort(ordered.begin(), ordered.end(), [compare_typed_values](const auto &pair1, const auto &pair2) { + return compare_typed_values(pair1.properties_order_by, pair2.properties_order_by); + }); + return ordered; +} + } // namespace memgraph::storage::v3 diff --git a/src/storage/v3/request_helper.hpp b/src/storage/v3/request_helper.hpp index 1e8b7a108..bbe4894e9 100644 --- a/src/storage/v3/request_helper.hpp +++ b/src/storage/v3/request_helper.hpp @@ -20,6 +20,7 @@ #include "storage/v3/edge_accessor.hpp" #include "storage/v3/expr.hpp" #include "storage/v3/shard.hpp" +#include "storage/v3/value_conversions.hpp" #include "storage/v3/vertex_accessor.hpp" #include "utils/template_utils.hpp" @@ -31,7 +32,7 @@ using EdgeFiller = using msgs::Value; template -concept ObjectAccessor = utils::SameAsAnyOf; +concept OrderableObject = utils::SameAsAnyOf>; inline bool TypedValueCompare(const TypedValue &a, const TypedValue &b) { // in ordering null comes after everything else @@ -125,7 +126,7 @@ class TypedValueVectorCompare final { std::vector ordering_; }; -template +template struct Element { std::vector properties_order_by; TObjectAccessor object_acc; @@ -167,6 +168,10 @@ std::vector> OrderByEdges(DbAccessor &dba, std::vector &order_by_edges, const VertexAccessor &vertex_acc); +std::vector>> OrderByEdges( + DbAccessor &dba, std::vector &iterable, std::vector &order_by_edges, + const std::vector &vertex_acc); + VerticesIterable::Iterator GetStartVertexIterator(VerticesIterable &vertex_iterable, const std::vector &primary_key, View view); @@ -177,19 +182,65 @@ std::vector>::const_iterator GetStartOrderedElementsIter std::array, 2> GetEdgesFromVertex(const VertexAccessor &vertex_accessor, msgs::EdgeDirection direction); -bool FilterOnVertex(DbAccessor &dba, const storage::v3::VertexAccessor &v_acc, const std::vector &filters, - std::string_view node_name); +bool FilterOnVertex(DbAccessor &dba, const storage::v3::VertexAccessor &v_acc, const std::vector &filters); + +bool FilterOnEdge(DbAccessor &dba, const storage::v3::VertexAccessor &v_acc, const EdgeAccessor &e_acc, + const std::vector &filters); std::vector EvaluateVertexExpressions(DbAccessor &dba, const VertexAccessor &v_acc, const std::vector &expressions, std::string_view node_name); -ShardResult> CollectSpecificPropertiesFromAccessor(const VertexAccessor &acc, +std::vector EvaluateEdgeExpressions(DbAccessor &dba, const VertexAccessor &v_acc, const EdgeAccessor &e_acc, + const std::vector &expressions); + +template +concept PropertiesAccessor = utils::SameAsAnyOf; + +template +ShardResult> CollectSpecificPropertiesFromAccessor(const TAccessor &acc, const std::vector &props, - View view); + View view) { + std::map ret; + + for (const auto &prop : props) { + auto result = acc.GetProperty(prop, view); + if (result.HasError()) { + spdlog::debug("Encountered an Error while trying to get a vertex property."); + return result.GetError(); + } + auto &value = result.GetValue(); + ret.emplace(std::make_pair(prop, FromPropertyValueToValue(std::move(value)))); + } + + return ret; +} ShardResult> CollectAllPropertiesFromAccessor(const VertexAccessor &acc, View view, const Schemas::Schema &schema); +namespace impl { +template +ShardResult> CollectAllPropertiesImpl(const TAccessor &acc, View view) { + std::map ret; + auto props = acc.Properties(view); + if (props.HasError()) { + spdlog::debug("Encountered an error while trying to get vertex properties."); + return props.GetError(); + } + + auto &properties = props.GetValue(); + std::transform(properties.begin(), properties.end(), std::inserter(ret, ret.begin()), + [](std::pair &pair) { + return std::make_pair(pair.first, conversions::FromPropertyValueToValue(std::move(pair.second))); + }); + return ret; +} +} // namespace impl + +template +ShardResult> CollectAllPropertiesFromAccessor(const TAccessor &acc, View view) { + return impl::CollectAllPropertiesImpl(acc, view); +} EdgeUniquenessFunction InitializeEdgeUniquenessFunction(bool only_unique_neighbor_rows); diff --git a/src/storage/v3/shard_rsm.cpp b/src/storage/v3/shard_rsm.cpp index 1548b56c7..639ffd6d8 100644 --- a/src/storage/v3/shard_rsm.cpp +++ b/src/storage/v3/shard_rsm.cpp @@ -10,12 +10,16 @@ // licenses/APL.txt. #include +#include +#include #include #include #include #include #include +#include +#include "common/errors.hpp" #include "parser/opencypher/parser.hpp" #include "query/v2/requests.hpp" #include "storage/v2/vertex.hpp" @@ -29,6 +33,7 @@ #include "storage/v3/bindings/symbol_generator.hpp" #include "storage/v3/bindings/symbol_table.hpp" #include "storage/v3/bindings/typed_value.hpp" +#include "storage/v3/conversions.hpp" #include "storage/v3/expr.hpp" #include "storage/v3/id_types.hpp" #include "storage/v3/key_store.hpp" @@ -326,7 +331,7 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::ScanVerticesRequest &&req) { std::vector expression_results; if (!req.filter_expressions.empty()) { // NOTE - DbAccessor might get removed in the future. - const bool eval = FilterOnVertex(dba, vertex, req.filter_expressions, expr::identifier_node_symbol); + const bool eval = FilterOnVertex(dba, vertex, req.filter_expressions); if (!eval) { return; } @@ -431,7 +436,7 @@ msgs::ReadResponses ShardRsm::HandleRead(msgs::ExpandOneRequest &&req) { } if (!req.filters.empty()) { // NOTE - DbAccessor might get removed in the future. - const bool eval = FilterOnVertex(dba, src_vertex_acc_opt.value(), req.filters, expr::identifier_node_symbol); + const bool eval = FilterOnVertex(dba, src_vertex_acc_opt.value(), req.filters); if (!eval) { continue; } @@ -510,9 +515,191 @@ msgs::WriteResponses ShardRsm::ApplyWrite(msgs::CommitRequest &&req) { return msgs::CommitResponse{}; }; -// NOLINTNEXTLINE(readability-convert-member-functions-to-static) -msgs::ReadResponses ShardRsm::HandleRead(msgs::GetPropertiesRequest && /*req*/) { - return msgs::GetPropertiesResponse{}; +msgs::ReadResponses ShardRsm::HandleRead(msgs::GetPropertiesRequest &&req) { + if (!req.vertex_ids.empty() && !req.vertices_and_edges.empty()) { + auto shard_error = SHARD_ERROR(ErrorCode::NONEXISTENT_OBJECT); + auto error = CreateErrorResponse(shard_error, req.transaction_id, ""); + return msgs::GetPropertiesResponse{.error = {}}; + } + + auto shard_acc = shard_->Access(req.transaction_id); + auto dba = DbAccessor{&shard_acc}; + const auto view = storage::v3::View::NEW; + + auto transform_props = [](std::map &&value) { + std::vector> result; + result.reserve(value.size()); + for (auto &[id, val] : value) { + result.emplace_back(std::make_pair(id, std::move(val))); + } + return result; + }; + + auto collect_props = [&req](const VertexAccessor &v_acc, + const std::optional &e_acc) -> ShardResult> { + if (!req.property_ids) { + if (e_acc) { + return CollectAllPropertiesFromAccessor(*e_acc, view); + } + return CollectAllPropertiesFromAccessor(v_acc, view); + } + + if (e_acc) { + return CollectSpecificPropertiesFromAccessor(*e_acc, *req.property_ids, view); + } + return CollectSpecificPropertiesFromAccessor(v_acc, *req.property_ids, view); + }; + + auto find_edge = [](const VertexAccessor &v, msgs::EdgeId e) -> std::optional { + auto in = v.InEdges(view); + MG_ASSERT(in.HasValue()); + for (auto &edge : in.GetValue()) { + if (edge.Gid().AsUint() == e.gid) { + return edge; + } + } + auto out = v.OutEdges(view); + MG_ASSERT(out.HasValue()); + for (auto &edge : out.GetValue()) { + if (edge.Gid().AsUint() == e.gid) { + return edge; + } + } + return std::nullopt; + }; + + const auto has_expr_to_evaluate = !req.expressions.empty(); + auto emplace_result_row = + [dba, transform_props, collect_props, has_expr_to_evaluate, &req]( + const VertexAccessor &v_acc, + const std::optional e_acc) mutable -> ShardResult { + auto maybe_id = v_acc.Id(view); + if (maybe_id.HasError()) { + return {maybe_id.GetError()}; + } + const auto &id = maybe_id.GetValue(); + std::optional e_id; + if (e_acc) { + e_id = msgs::EdgeId{e_acc->Gid().AsUint()}; + } + msgs::VertexId v_id{msgs::Label{id.primary_label}, ConvertValueVector(id.primary_key)}; + auto maybe_props = collect_props(v_acc, e_acc); + if (maybe_props.HasError()) { + return {maybe_props.GetError()}; + } + auto props = transform_props(std::move(maybe_props.GetValue())); + auto result = msgs::GetPropertiesResultRow{.vertex = std::move(v_id), .edge = e_id, .props = std::move(props)}; + if (has_expr_to_evaluate) { + std::vector e_results; + if (e_acc) { + e_results = + ConvertToValueVectorFromTypedValueVector(EvaluateEdgeExpressions(dba, v_acc, *e_acc, req.expressions)); + } else { + e_results = ConvertToValueVectorFromTypedValueVector( + EvaluateVertexExpressions(dba, v_acc, req.expressions, expr::identifier_node_symbol)); + } + result.evaluated_expressions = std::move(e_results); + } + return {std::move(result)}; + }; + + auto get_limit = [&req](const auto &elements) { + size_t limit = elements.size(); + if (req.limit && *req.limit < elements.size()) { + limit = *req.limit; + } + return limit; + }; + + auto collect_response = [get_limit, &req](auto &elements, auto create_result_row) { + msgs::GetPropertiesResponse response; + const auto limit = get_limit(elements); + for (size_t index = 0; index != limit; ++index) { + auto result_row = create_result_row(elements[index]); + if (result_row.HasError()) { + return msgs::GetPropertiesResponse{.error = CreateErrorResponse(result_row.GetError(), req.transaction_id, "")}; + } + response.result_row.push_back(std::move(result_row.GetValue())); + } + return response; + }; + + std::vector vertices; + std::vector edges; + + auto parse_and_filter = [dba, &vertices](auto &container, auto projection, auto filter, auto maybe_get_edge) mutable { + for (const auto &elem : container) { + const auto &[label, pk_v] = projection(elem); + auto pk = ConvertPropertyVector(pk_v); + auto v_acc = dba.FindVertex(pk, view); + if (!v_acc || filter(*v_acc, maybe_get_edge(elem))) { + continue; + } + vertices.push_back(*v_acc); + } + }; + auto identity = [](auto &elem) { return elem; }; + + auto filter_vertex = [dba, req](const auto &acc, const auto & /*edge*/) mutable { + if (!req.filter) { + return false; + } + return !FilterOnVertex(dba, acc, {*req.filter}); + }; + + auto filter_edge = [dba, &edges, &req, find_edge](const auto &acc, const auto &edge) mutable { + auto e_acc = find_edge(acc, edge); + if (!e_acc) { + return true; + } + + if (req.filter && !FilterOnEdge(dba, acc, *e_acc, {*req.filter})) { + return true; + } + edges.push_back(*e_acc); + return false; + }; + + // Handler logic here + if (!req.vertex_ids.empty()) { + parse_and_filter(req.vertex_ids, identity, filter_vertex, identity); + } else { + parse_and_filter( + req.vertices_and_edges, [](auto &e) { return e.first; }, filter_edge, [](auto &e) { return e.second; }); + } + + if (!req.vertex_ids.empty()) { + if (!req.order_by.empty()) { + auto elements = OrderByVertices(dba, vertices, req.order_by); + return collect_response(elements, [emplace_result_row](auto &element) mutable { + return emplace_result_row(element.object_acc, std::nullopt); + }); + } + return collect_response(vertices, + [emplace_result_row](auto &acc) mutable { return emplace_result_row(acc, std::nullopt); }); + } + + if (!req.order_by.empty()) { + auto elements = OrderByEdges(dba, edges, req.order_by, vertices); + return collect_response(elements, [emplace_result_row](auto &element) mutable { + return emplace_result_row(element.object_acc.first, element.object_acc.second); + }); + } + + struct ZipView { + ZipView(std::vector &v, std::vector &e) : v(v), e(e) {} + size_t size() const { return v.size(); } + auto operator[](size_t index) { return std::make_pair(v[index], e[index]); } + + private: + std::vector &v; + std::vector &e; + }; + + ZipView vertices_and_edges(vertices, edges); + return collect_response(vertices_and_edges, [emplace_result_row](const auto &acc) mutable { + return emplace_result_row(acc.first, acc.second); + }); } } // namespace memgraph::storage::v3 diff --git a/tests/simulation/CMakeLists.txt b/tests/simulation/CMakeLists.txt index 9e1a4c71e..cd5fc0a4a 100644 --- a/tests/simulation/CMakeLists.txt +++ b/tests/simulation/CMakeLists.txt @@ -32,3 +32,4 @@ add_simulation_test(trial_query_storage/query_storage_test.cpp) add_simulation_test(sharded_map.cpp) add_simulation_test(shard_rsm.cpp) add_simulation_test(cluster_property_test.cpp) +add_simulation_test(request_router.cpp) diff --git a/tests/simulation/common.hpp b/tests/simulation/common.hpp index a73bf37a5..fcdc1338c 100644 --- a/tests/simulation/common.hpp +++ b/tests/simulation/common.hpp @@ -76,14 +76,10 @@ class MockedShardRsm { using WriteRequests = msgs::WriteRequests; using WriteResponses = msgs::WriteResponses; - // ExpandOneResponse Read(ExpandOneRequest rqst); - // GetPropertiesResponse Read(GetPropertiesRequest rqst); msgs::ScanVerticesResponse ReadImpl(msgs::ScanVerticesRequest rqst) { msgs::ScanVerticesResponse ret; auto as_prop_val = storage::conversions::ConvertPropertyVector(rqst.start_id.second); - if (!IsKeyInRange(as_prop_val)) { - ret.success = false; - } else if (as_prop_val == ShardRsmKey{PropertyValue(0), PropertyValue(0)}) { + if (as_prop_val == ShardRsmKey{PropertyValue(0), PropertyValue(0)}) { msgs::Value val(int64_t(0)); ret.next_start_id = std::make_optional(); ret.next_start_id->second = @@ -91,37 +87,46 @@ class MockedShardRsm { msgs::ScanResultRow result; result.props.push_back(std::make_pair(msgs::PropertyId::FromUint(0), val)); ret.results.push_back(std::move(result)); - ret.success = true; } else if (as_prop_val == ShardRsmKey{PropertyValue(1), PropertyValue(0)}) { msgs::ScanResultRow result; msgs::Value val(int64_t(1)); result.props.push_back(std::make_pair(msgs::PropertyId::FromUint(0), val)); ret.results.push_back(std::move(result)); - ret.success = true; } else if (as_prop_val == ShardRsmKey{PropertyValue(12), PropertyValue(13)}) { msgs::ScanResultRow result; msgs::Value val(int64_t(444)); result.props.push_back(std::make_pair(msgs::PropertyId::FromUint(0), val)); ret.results.push_back(std::move(result)); - ret.success = true; - } else { - ret.success = false; } return ret; } msgs::ExpandOneResponse ReadImpl(msgs::ExpandOneRequest rqst) { return {}; } - msgs::ExpandOneResponse ReadImpl(msgs::GetPropertiesRequest rqst) { return {}; } + msgs::GetPropertiesResponse ReadImpl(msgs::GetPropertiesRequest rqst) { + msgs::GetPropertiesResponse resp; + auto &vertices = rqst.vertex_ids; + for (auto &vertex : vertices) { + auto as_prop_val = storage::conversions::ConvertPropertyVector(vertex.second); + if (as_prop_val == ShardRsmKey{PropertyValue(0), PropertyValue(0)}) { + resp.result_row.push_back(msgs::GetPropertiesResultRow{.vertex = std::move(vertex)}); + } else if (as_prop_val == ShardRsmKey{PropertyValue(1), PropertyValue(0)}) { + resp.result_row.push_back(msgs::GetPropertiesResultRow{.vertex = std::move(vertex)}); + } else if (as_prop_val == ShardRsmKey{PropertyValue(13), PropertyValue(13)}) { + resp.result_row.push_back(msgs::GetPropertiesResultRow{.vertex = std::move(vertex)}); + } + } + return resp; + } ReadResponses Read(ReadRequests read_requests) { return {std::visit([this](T &&request) { return ReadResponses{ReadImpl(std::forward(request))}; }, std::move(read_requests))}; } - msgs::CreateVerticesResponse ApplyImpl(msgs::CreateVerticesRequest rqst) { return {.success = true}; } + msgs::CreateVerticesResponse ApplyImpl(msgs::CreateVerticesRequest rqst) { return {}; } msgs::DeleteVerticesResponse ApplyImpl(msgs::DeleteVerticesRequest rqst) { return {}; } msgs::UpdateVerticesResponse ApplyImpl(msgs::UpdateVerticesRequest rqst) { return {}; } - msgs::CreateExpandResponse ApplyImpl(msgs::CreateExpandRequest rqst) { return {.success = true}; } + msgs::CreateExpandResponse ApplyImpl(msgs::CreateExpandRequest rqst) { return {}; } msgs::DeleteEdgesResponse ApplyImpl(msgs::DeleteEdgesRequest rqst) { return {}; } msgs::UpdateEdgesResponse ApplyImpl(msgs::UpdateEdgesRequest rqst) { return {}; } msgs::CommitResponse ApplyImpl(msgs::CommitRequest rqst) { return {}; } diff --git a/tests/simulation/request_router.cpp b/tests/simulation/request_router.cpp index 8187f138b..e990c867e 100644 --- a/tests/simulation/request_router.cpp +++ b/tests/simulation/request_router.cpp @@ -152,9 +152,7 @@ void RunStorageRaft(Raft state{.label = "test_label"}; - - auto result = request_router.Request(state); + auto result = request_router.ScanVertices("test_label"); MG_ASSERT(result.size() == 2); { auto prop = result[0].GetProperty(msgs::PropertyId::FromUint(0)); @@ -162,18 +160,10 @@ void TestScanVertices(query::v2::RequestRouterInterface &request_router) { prop = result[1].GetProperty(msgs::PropertyId::FromUint(0)); MG_ASSERT(prop.int_v == 444); } - - result = request_router.Request(state); - { - MG_ASSERT(result.size() == 1); - auto prop = result[0].GetProperty(msgs::PropertyId::FromUint(0)); - MG_ASSERT(prop.int_v == 1); - } } void TestCreateVertices(query::v2::RequestRouterInterface &request_router) { using PropVal = msgs::Value; - msgs::ExecutionState state; std::vector new_vertices; auto label_id = request_router.NameToLabel("test_label"); msgs::NewVertex a1{.primary_key = {PropVal(int64_t(1)), PropVal(int64_t(0))}}; @@ -183,13 +173,13 @@ void TestCreateVertices(query::v2::RequestRouterInterface &request_router) { new_vertices.push_back(std::move(a1)); new_vertices.push_back(std::move(a2)); - auto result = request_router.Request(state, std::move(new_vertices)); + auto result = request_router.CreateVertices(std::move(new_vertices)); MG_ASSERT(result.size() == 2); } void TestCreateExpand(query::v2::RequestRouterInterface &request_router) { using PropVal = msgs::Value; - msgs::ExecutionState state; + msgs::CreateExpandRequest state; std::vector new_expands; const auto edge_type_id = request_router.NameToEdgeType("edge_type"); @@ -203,24 +193,42 @@ void TestCreateExpand(query::v2::RequestRouterInterface &request_router) { new_expands.push_back(std::move(expand_1)); new_expands.push_back(std::move(expand_2)); - auto responses = request_router.Request(state, std::move(new_expands)); + auto responses = request_router.CreateExpand(std::move(new_expands)); MG_ASSERT(responses.size() == 2); - MG_ASSERT(responses[0].success); - MG_ASSERT(responses[1].success); + MG_ASSERT(!responses[0].error); + MG_ASSERT(!responses[1].error); } void TestExpandOne(query::v2::RequestRouterInterface &request_router) { - msgs::ExecutionState state{}; + msgs::ExpandOneRequest state{}; msgs::ExpandOneRequest request; const auto edge_type_id = request_router.NameToEdgeType("edge_type"); const auto label = msgs::Label{request_router.NameToLabel("test_label")}; request.src_vertices.push_back(msgs::VertexId{label, {msgs::Value(int64_t(0)), msgs::Value(int64_t(0))}}); request.edge_types.push_back(msgs::EdgeType{edge_type_id}); request.direction = msgs::EdgeDirection::BOTH; - auto result_rows = request_router.Request(state, std::move(request)); + auto result_rows = request_router.ExpandOne(std::move(request)); MG_ASSERT(result_rows.size() == 2); } +void TestGetProperties(query::v2::RequestRouterInterface &request_router) { + using PropVal = msgs::Value; + + auto label_id = request_router.NameToLabel("test_label"); + msgs::VertexId v0{{label_id}, {PropVal(int64_t(0)), PropVal(int64_t(0))}}; + msgs::VertexId v1{{label_id}, {PropVal(int64_t(1)), PropVal(int64_t(0))}}; + msgs::VertexId v2{{label_id}, {PropVal(int64_t(13)), PropVal(int64_t(13))}}; + + msgs::GetPropertiesRequest request; + + request.vertex_ids.push_back({v0}); + request.vertex_ids.push_back({v1}); + request.vertex_ids.push_back({v2}); + + auto result = request_router.GetProperties(std::move(request)); + MG_ASSERT(result.size() == 3); +} + template void TestAggregate(RequestRouter &request_router) {} @@ -343,6 +351,7 @@ void DoTest() { TestScanVertices(request_router); TestCreateVertices(request_router); TestCreateExpand(request_router); + TestGetProperties(request_router); simulator.ShutDown(); diff --git a/tests/simulation/shard_rsm.cpp b/tests/simulation/shard_rsm.cpp index 1577d4e0f..768217945 100644 --- a/tests/simulation/shard_rsm.cpp +++ b/tests/simulation/shard_rsm.cpp @@ -480,6 +480,65 @@ std::tuple> AttemptToScanAllWithExpression } } +msgs::GetPropertiesResponse AttemptToGetProperties( + ShardClient &client, std::optional> properties, std::vector vertices, + std::vector edges, std::optional limit = std::nullopt, + std::optional filter_prop = std::nullopt, bool edge = false, + std::optional order_by = std::nullopt) { + msgs::GetPropertiesRequest req{}; + req.transaction_id.logical_id = GetTransactionId(); + req.property_ids = std::move(properties); + + if (filter_prop) { + std::string filter_expr = (!edge) ? "MG_SYMBOL_NODE.prop1 >= " : "MG_SYMBOL_EDGE.e_prop = "; + filter_expr += std::to_string(*filter_prop); + req.filter = std::make_optional(std::move(filter_expr)); + } + if (order_by) { + std::string filter_expr = (!edge) ? "MG_SYMBOL_NODE." : "MG_SYMBOL_EDGE."; + filter_expr += *order_by; + msgs::OrderBy order_by{.expression = {std::move(filter_expr)}, .direction = msgs::OrderingDirection::DESCENDING}; + std::vector request_order_by; + request_order_by.push_back(std::move(order_by)); + req.order_by = std::move(request_order_by); + } + if (limit) { + req.limit = limit; + } + req.expressions = {std::string("5 = 5")}; + std::vector req_v; + std::vector req_e; + for (auto &v : vertices) { + req_v.push_back(std::move(v)); + } + for (auto &e : edges) { + req_e.push_back(std::move(e)); + } + + if (!edges.empty()) { + MG_ASSERT(edges.size() == vertices.size()); + size_t id = 0; + req.vertices_and_edges.reserve(req_v.size()); + for (auto &v : req_v) { + req.vertices_and_edges.push_back({std::move(v), std::move(req_e[id++])}); + } + } else { + req.vertex_ids = std::move(req_v); + } + + while (true) { + auto read_res = client.SendReadRequest(req); + if (read_res.HasError()) { + continue; + } + + auto write_response_result = read_res.GetValue(); + auto write_response = std::get(write_response_result); + + return write_response; + } +} + void AttemptToScanAllWithOrderByOnPrimaryProperty(ShardClient &client, msgs::VertexId start_id, uint64_t batch_limit) { msgs::ScanVerticesRequest scan_req; scan_req.batch_limit = batch_limit; @@ -1204,6 +1263,205 @@ void TestExpandOneGraphTwo(ShardClient &client) { } } +void TestGetProperties(ShardClient &client) { + const auto unique_prop_val_1 = GetUniqueInteger(); + const auto unique_prop_val_2 = GetUniqueInteger(); + const auto unique_prop_val_3 = GetUniqueInteger(); + const auto unique_prop_val_4 = GetUniqueInteger(); + const auto unique_prop_val_5 = GetUniqueInteger(); + + MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val_1)); + MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val_2)); + MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val_3)); + MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val_4)); + MG_ASSERT(AttemptToCreateVertex(client, unique_prop_val_5)); + + const msgs::Label prim_label = {.id = get_primary_label()}; + const msgs::PrimaryKey prim_key = {msgs::Value(static_cast(unique_prop_val_1))}; + const msgs::VertexId v_id = {prim_label, prim_key}; + const msgs::PrimaryKey prim_key_2 = {msgs::Value(static_cast(unique_prop_val_2))}; + const msgs::VertexId v_id_2 = {prim_label, prim_key_2}; + const msgs::PrimaryKey prim_key_3 = {msgs::Value(static_cast(unique_prop_val_3))}; + const msgs::VertexId v_id_3 = {prim_label, prim_key_3}; + const msgs::PrimaryKey prim_key_4 = {msgs::Value(static_cast(unique_prop_val_4))}; + const msgs::VertexId v_id_4 = {prim_label, prim_key_4}; + const msgs::PrimaryKey prim_key_5 = {msgs::Value(static_cast(unique_prop_val_5))}; + const msgs::VertexId v_id_5 = {prim_label, prim_key_5}; + const auto prop_id_2 = PropertyId::FromUint(2); + const auto prop_id_4 = PropertyId::FromUint(4); + const auto prop_id_5 = PropertyId::FromUint(5); + // No properties + { + const auto result = AttemptToGetProperties(client, {{}}, {v_id, v_id_2}, {}); + MG_ASSERT(!result.error); + MG_ASSERT(result.result_row.size() == 2); + for (const auto &elem : result.result_row) { + MG_ASSERT(elem.props.size() == 0); + } + } + // All properties + { + const auto result = AttemptToGetProperties(client, std::nullopt, {v_id, v_id_2}, {}); + MG_ASSERT(!result.error); + MG_ASSERT(result.result_row.size() == 2); + for (const auto &elem : result.result_row) { + MG_ASSERT(elem.props.size() == 3); + } + } + { + // Specific properties + const auto result = + AttemptToGetProperties(client, std::vector{prop_id_2, prop_id_4, prop_id_5}, {v_id, v_id_2, v_id_3}, {}); + MG_ASSERT(!result.error); + MG_ASSERT(!result.result_row.empty()); + MG_ASSERT(result.result_row.size() == 3); + for (const auto &elem : result.result_row) { + MG_ASSERT(elem.props.size() == 3); + } + } + { + // Two properties from two vertices with a filter on unique_prop_5 + const auto result = AttemptToGetProperties(client, std::vector{prop_id_2, prop_id_4}, {v_id, v_id_2, v_id_5}, {}, + std::nullopt, unique_prop_val_5); + MG_ASSERT(!result.error); + MG_ASSERT(result.result_row.size() == 1); + } + { + // One property from three vertices. + const auto result = AttemptToGetProperties(client, std::vector{prop_id_2}, {v_id, v_id_2, v_id_3}, {}); + MG_ASSERT(!result.error); + MG_ASSERT(result.result_row.size() == 3); + MG_ASSERT(result.result_row[0].props.size() == 1); + MG_ASSERT(result.result_row[1].props.size() == 1); + MG_ASSERT(result.result_row[2].props.size() == 1); + } + { + // Same as before but with limit of 1 row + const auto result = AttemptToGetProperties(client, std::vector{prop_id_2}, {v_id, v_id_2, v_id_3}, {}, + std::make_optional(1)); + MG_ASSERT(!result.error); + MG_ASSERT(result.result_row.size() == 1); + } + { + // Same as before but with a limit greater than the elements returned + const auto result = AttemptToGetProperties(client, std::vector{prop_id_2}, std::vector{v_id, v_id_2, v_id_3}, {}, + std::make_optional(5)); + MG_ASSERT(!result.error); + MG_ASSERT(result.result_row.size() == 3); + } + { + // Order by on `prop1` (descending) + const auto result = AttemptToGetProperties(client, std::vector{prop_id_2}, {v_id, v_id_2, v_id_3}, {}, std::nullopt, + std::nullopt, false, "prop1"); + MG_ASSERT(!result.error); + MG_ASSERT(result.result_row.size() == 3); + MG_ASSERT(result.result_row[0].vertex == v_id_3); + MG_ASSERT(result.result_row[1].vertex == v_id_2); + MG_ASSERT(result.result_row[2].vertex == v_id); + } + { + // Order by and filter on >= unique_prop_val_3 && assert result row data members + const auto result = AttemptToGetProperties(client, std::vector{prop_id_2}, {v_id, v_id_2, v_id_3, v_id_4, v_id_5}, + {}, std::nullopt, unique_prop_val_3, false, "prop1"); + MG_ASSERT(!result.error); + MG_ASSERT(result.result_row.size() == 3); + MG_ASSERT(result.result_row[0].vertex == v_id_5); + MG_ASSERT(result.result_row[0].props.size() == 1); + MG_ASSERT(result.result_row[0].props.front().second == prim_key_5.front()); + MG_ASSERT(result.result_row[0].props.size() == 1); + MG_ASSERT(result.result_row[0].props.front().first == prop_id_2); + MG_ASSERT(result.result_row[0].evaluated_expressions.size() == 1); + MG_ASSERT(result.result_row[0].evaluated_expressions.front() == msgs::Value(true)); + + MG_ASSERT(result.result_row[1].vertex == v_id_4); + MG_ASSERT(result.result_row[1].props.size() == 1); + MG_ASSERT(result.result_row[1].props.front().second == prim_key_4.front()); + MG_ASSERT(result.result_row[1].props.size() == 1); + MG_ASSERT(result.result_row[1].props.front().first == prop_id_2); + MG_ASSERT(result.result_row[1].evaluated_expressions.size() == 1); + MG_ASSERT(result.result_row[1].evaluated_expressions.front() == msgs::Value(true)); + + MG_ASSERT(result.result_row[2].vertex == v_id_3); + MG_ASSERT(result.result_row[2].props.size() == 1); + MG_ASSERT(result.result_row[2].props.front().second == prim_key_3.front()); + MG_ASSERT(result.result_row[2].props.size() == 1); + MG_ASSERT(result.result_row[2].props.front().first == prop_id_2); + MG_ASSERT(result.result_row[2].evaluated_expressions.size() == 1); + MG_ASSERT(result.result_row[2].evaluated_expressions.front() == msgs::Value(true)); + } + + // Edges + const auto edge_gid = GetUniqueInteger(); + const auto edge_type_id = EdgeTypeId::FromUint(GetUniqueInteger()); + const auto unique_edge_prop_id = 7; + const auto edge_prop_val = GetUniqueInteger(); + MG_ASSERT(AttemptToAddEdgeWithProperties(client, unique_prop_val_1, unique_prop_val_2, edge_gid, unique_edge_prop_id, + edge_prop_val, {edge_type_id})); + const auto edge_gid_2 = GetUniqueInteger(); + const auto edge_prop_val_2 = GetUniqueInteger(); + MG_ASSERT(AttemptToAddEdgeWithProperties(client, unique_prop_val_3, unique_prop_val_4, edge_gid_2, + unique_edge_prop_id, edge_prop_val_2, {edge_type_id})); + const auto edge_prop_id = PropertyId::FromUint(unique_edge_prop_id); + std::vector edge_ids = {{edge_gid}, {edge_gid_2}}; + // No properties + { + const auto result = AttemptToGetProperties(client, {{}}, {v_id_2, v_id_3}, edge_ids); + MG_ASSERT(!result.error); + MG_ASSERT(result.result_row.size() == 2); + for (const auto &elem : result.result_row) { + MG_ASSERT(elem.props.size() == 0); + } + } + // All properties + { + const auto result = AttemptToGetProperties(client, std::nullopt, {v_id_2, v_id_3}, edge_ids); + MG_ASSERT(!result.error); + MG_ASSERT(result.result_row.size() == 2); + for (const auto &elem : result.result_row) { + MG_ASSERT(elem.props.size() == 1); + } + } + // Properties for two vertices + { + const auto result = AttemptToGetProperties(client, std::vector{edge_prop_id}, {v_id_2, v_id_3}, edge_ids); + MG_ASSERT(!result.error); + MG_ASSERT(result.result_row.size() == 2); + } + // Filter + { + const auto result = AttemptToGetProperties(client, std::vector{edge_prop_id}, {v_id_2, v_id_3}, edge_ids, {}, + {edge_prop_val}, true); + MG_ASSERT(!result.error); + MG_ASSERT(result.result_row.size() == 1); + MG_ASSERT(result.result_row.front().edge); + MG_ASSERT(result.result_row.front().edge.value().gid == edge_gid); + MG_ASSERT(result.result_row.front().props.size() == 1); + MG_ASSERT(result.result_row.front().props.front().second == msgs::Value(static_cast(edge_prop_val))); + } + // Order by + { + const auto result = + AttemptToGetProperties(client, std::vector{edge_prop_id}, {v_id_2, v_id_3}, edge_ids, {}, {}, true, "e_prop"); + MG_ASSERT(!result.error); + MG_ASSERT(result.result_row.size() == 2); + MG_ASSERT(result.result_row[0].vertex == v_id_3); + MG_ASSERT(result.result_row[0].edge); + MG_ASSERT(result.result_row[0].edge.value().gid == edge_gid_2); + MG_ASSERT(result.result_row[0].props.size() == 1); + MG_ASSERT(result.result_row[0].props.front().second == msgs::Value(static_cast(edge_prop_val_2))); + MG_ASSERT(result.result_row[0].evaluated_expressions.size() == 1); + MG_ASSERT(result.result_row[0].evaluated_expressions.front() == msgs::Value(true)); + + MG_ASSERT(result.result_row[1].vertex == v_id_2); + MG_ASSERT(result.result_row[1].edge); + MG_ASSERT(result.result_row[1].edge.value().gid == edge_gid); + MG_ASSERT(result.result_row[1].props.size() == 1); + MG_ASSERT(result.result_row[1].props.front().second == msgs::Value(static_cast(edge_prop_val))); + MG_ASSERT(result.result_row[1].evaluated_expressions.size() == 1); + MG_ASSERT(result.result_row[1].evaluated_expressions.front() == msgs::Value(true)); + } +} + } // namespace int TestMessages() { @@ -1242,9 +1500,12 @@ int TestMessages() { auto shard_ptr2 = std::make_unique(get_primary_label(), min_prim_key, max_prim_key, schema_prop); auto shard_ptr3 = std::make_unique(get_primary_label(), min_prim_key, max_prim_key, schema_prop); - shard_ptr1->StoreMapping({{1, "label"}, {2, "prop1"}, {3, "label1"}, {4, "prop2"}, {5, "prop3"}, {6, "prop4"}}); - shard_ptr2->StoreMapping({{1, "label"}, {2, "prop1"}, {3, "label1"}, {4, "prop2"}, {5, "prop3"}, {6, "prop4"}}); - shard_ptr3->StoreMapping({{1, "label"}, {2, "prop1"}, {3, "label1"}, {4, "prop2"}, {5, "prop3"}, {6, "prop4"}}); + shard_ptr1->StoreMapping( + {{1, "label"}, {2, "prop1"}, {3, "label1"}, {4, "prop2"}, {5, "prop3"}, {6, "prop4"}, {7, "e_prop"}}); + shard_ptr2->StoreMapping( + {{1, "label"}, {2, "prop1"}, {3, "label1"}, {4, "prop2"}, {5, "prop3"}, {6, "prop4"}, {7, "e_prop"}}); + shard_ptr3->StoreMapping( + {{1, "label"}, {2, "prop1"}, {3, "label1"}, {4, "prop2"}, {5, "prop3"}, {6, "prop4"}, {7, "e_prop"}}); std::vector
address_for_1{shard_server_2_address, shard_server_3_address}; std::vector
address_for_2{shard_server_1_address, shard_server_3_address}; @@ -1286,6 +1547,8 @@ int TestMessages() { TestExpandOneGraphOne(client); TestExpandOneGraphTwo(client); + // GetProperties tests + TestGetProperties(client); simulator.ShutDown(); SimulatorStats stats = simulator.Stats(); diff --git a/tests/unit/query_v2_expression_evaluator.cpp b/tests/unit/query_v2_expression_evaluator.cpp index 5f77ed4e7..f819ee16f 100644 --- a/tests/unit/query_v2_expression_evaluator.cpp +++ b/tests/unit/query_v2_expression_evaluator.cpp @@ -51,6 +51,8 @@ using memgraph::msgs::CreateVerticesResponse; using memgraph::msgs::ExpandOneRequest; using memgraph::msgs::ExpandOneResponse; using memgraph::msgs::ExpandOneResultRow; +using memgraph::msgs::GetPropertiesRequest; +using memgraph::msgs::GetPropertiesResultRow; using memgraph::msgs::NewExpand; using memgraph::msgs::NewVertex; using memgraph::msgs::ScanVerticesRequest; @@ -92,6 +94,8 @@ class MockedRequestRouter : public RequestRouterInterface { std::vector CreateExpand(std::vector new_edges) override { return {}; } + std::vector GetProperties(GetPropertiesRequest rqst) override { return {}; } + const std::string &PropertyToName(memgraph::storage::v3::PropertyId id) const override { return properties_.IdToName(id.AsUint()); }