From 3a8f01af792d44c9faef77249f0e04123749c6ef Mon Sep 17 00:00:00 2001 From: jbajic Date: Thu, 3 Nov 2022 16:29:28 +0100 Subject: [PATCH 1/5] Fix expand one source vertex pk setting --- src/storage/v3/shard_rsm.cpp | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/src/storage/v3/shard_rsm.cpp b/src/storage/v3/shard_rsm.cpp index 9a124b250..2e854e659 100644 --- a/src/storage/v3/shard_rsm.cpp +++ b/src/storage/v3/shard_rsm.cpp @@ -181,7 +181,7 @@ std::vector EvaluateVertexExpressions(DbAccessor &dba, const VertexA struct LocalError {}; std::optional FillUpSourceVertex(const std::optional &v_acc, - const msgs::ExpandOneRequest &req, msgs::VertexId src_vertex) { + const msgs::ExpandOneRequest &req) { auto secondary_labels = v_acc->Labels(View::NEW); if (secondary_labels.HasError()) { spdlog::debug("Encountered an error while trying to get the secondary labels of a vertex. Transaction id: {}", @@ -191,7 +191,19 @@ std::optional FillUpSourceVertex(const std::optionalPrimaryLabel(View::NEW); + if (vertex_label.HasError()) { + spdlog::debug("Encountered an error while trying to get the primary label of source vertex. Transaction id: {}", + req.transaction_id.logical_id); + return std::nullopt; + } + const auto vertex_pk = v_acc->PrimaryKey(View::NEW); + if (vertex_pk.HasError()) { + spdlog::debug("Encountered an error while trying to get the primary key of source vertex. Transaction id: {}", + req.transaction_id.logical_id); + return std::nullopt; + } + source_vertex.id = msgs::VertexId{msgs::Label{*vertex_label}, conversions::ConvertValueVector(*vertex_pk)}; source_vertex.labels.reserve(sec_labels.size()); std::transform(sec_labels.begin(), sec_labels.end(), std::back_inserter(source_vertex.labels), @@ -325,7 +337,7 @@ std::optional GetExpandOneResult( const auto primary_key = ConvertPropertyVector(std::move(src_vertex.second)); auto v_acc = acc.FindVertex(primary_key, View::NEW); - auto source_vertex = FillUpSourceVertex(v_acc, req, src_vertex); + auto source_vertex = FillUpSourceVertex(v_acc, req); if (!source_vertex) { return std::nullopt; } From bab5e1386a51c19352e8c4b82b782a0c41ea4779 Mon Sep 17 00:00:00 2001 From: jbajic Date: Fri, 4 Nov 2022 15:05:55 +0100 Subject: [PATCH 2/5] Address review comments --- src/storage/v3/shard_rsm.cpp | 22 +++++----------------- 1 file changed, 5 insertions(+), 17 deletions(-) diff --git a/src/storage/v3/shard_rsm.cpp b/src/storage/v3/shard_rsm.cpp index 2e854e659..77f77f2e7 100644 --- a/src/storage/v3/shard_rsm.cpp +++ b/src/storage/v3/shard_rsm.cpp @@ -180,8 +180,8 @@ std::vector EvaluateVertexExpressions(DbAccessor &dba, const VertexA struct LocalError {}; -std::optional FillUpSourceVertex(const std::optional &v_acc, - const msgs::ExpandOneRequest &req) { +std::optional FillUpSourceVertexSecondaryLabels(const std::optional &v_acc, + const msgs::ExpandOneRequest &req) { auto secondary_labels = v_acc->Labels(View::NEW); if (secondary_labels.HasError()) { spdlog::debug("Encountered an error while trying to get the secondary labels of a vertex. Transaction id: {}", @@ -191,19 +191,6 @@ std::optional FillUpSourceVertex(const std::optionalPrimaryLabel(View::NEW); - if (vertex_label.HasError()) { - spdlog::debug("Encountered an error while trying to get the primary label of source vertex. Transaction id: {}", - req.transaction_id.logical_id); - return std::nullopt; - } - const auto vertex_pk = v_acc->PrimaryKey(View::NEW); - if (vertex_pk.HasError()) { - spdlog::debug("Encountered an error while trying to get the primary key of source vertex. Transaction id: {}", - req.transaction_id.logical_id); - return std::nullopt; - } - source_vertex.id = msgs::VertexId{msgs::Label{*vertex_label}, conversions::ConvertValueVector(*vertex_pk)}; source_vertex.labels.reserve(sec_labels.size()); std::transform(sec_labels.begin(), sec_labels.end(), std::back_inserter(source_vertex.labels), @@ -334,13 +321,14 @@ std::optional GetExpandOneResult( const EdgeUniqunessFunction &maybe_filter_based_on_edge_uniquness, const EdgeFiller &edge_filler, const Schemas::Schema *schema) { /// Fill up source vertex - const auto primary_key = ConvertPropertyVector(std::move(src_vertex.second)); + const auto primary_key = ConvertPropertyVector(src_vertex.second); auto v_acc = acc.FindVertex(primary_key, View::NEW); - auto source_vertex = FillUpSourceVertex(v_acc, req); + auto source_vertex = FillUpSourceVertexSecondaryLabels(v_acc, req); if (!source_vertex) { return std::nullopt; } + source_vertex->id = src_vertex; std::optional> src_vertex_properties; src_vertex_properties = FillUpSourceVertexProperties(v_acc, req, storage::v3::View::NEW, schema); From e52ce1e3637885182f48e8f06ad0311c5e40305b Mon Sep 17 00:00:00 2001 From: jbajic Date: Fri, 4 Nov 2022 16:18:18 +0100 Subject: [PATCH 3/5] Add expandone test --- tests/e2e/distributed_queries/common.py | 14 +++---- .../distributed_expand_one.py | 37 +++++++++++++++++++ tests/e2e/distributed_queries/workloads.yaml | 5 +++ 3 files changed, 49 insertions(+), 7 deletions(-) create mode 100644 tests/e2e/distributed_queries/distributed_expand_one.py diff --git a/tests/e2e/distributed_queries/common.py b/tests/e2e/distributed_queries/common.py index 3588a803c..b99061eb0 100644 --- a/tests/e2e/distributed_queries/common.py +++ b/tests/e2e/distributed_queries/common.py @@ -9,11 +9,11 @@ # by the Apache License, Version 2.0, included in the file # licenses/APL.txt. -import typing -import mgclient -import sys -import pytest import time +from typing import List, Optional + +import mgclient +import pytest @pytest.fixture(autouse=True) @@ -28,12 +28,12 @@ def connect(**kwargs) -> mgclient.Connection: return connection -def execute_and_fetch_all(cursor: mgclient.Cursor, query: str, params: dict = {}) -> typing.List[tuple]: - cursor.execute(query, params) +def execute_and_fetch_all(cursor: mgclient.Cursor, query: str, params: Optional[dict] = None) -> List[tuple]: + cursor.execute(query, params if params else {}) return cursor.fetchall() -def has_n_result_row(cursor: mgclient.Cursor, query: str, n: int): +def has_n_result_row(cursor: mgclient.Cursor, query: str, n: int) -> bool: results = execute_and_fetch_all(cursor, query) return len(results) == n diff --git a/tests/e2e/distributed_queries/distributed_expand_one.py b/tests/e2e/distributed_queries/distributed_expand_one.py new file mode 100644 index 000000000..9d3d81074 --- /dev/null +++ b/tests/e2e/distributed_queries/distributed_expand_one.py @@ -0,0 +1,37 @@ +# Copyright 2022 Memgraph Ltd. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +# License, and you may not use this file except in compliance with the Business Source License. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0, included in the file +# licenses/APL.txt. + +import sys + +import pytest + +from common import connection, execute_and_fetch_all, has_n_result_row, wait_for_shard_manager_to_initialize + + +def test_sequenced_expand_one(connection): + wait_for_shard_manager_to_initialize() + cursor = connection.cursor() + + for i in range(1, 4): + assert has_n_result_row(cursor, f"CREATE (:label {{property:{i}}})", 0), f"Failed creating node" + assert has_n_result_row(cursor, "MATCH (n {property:1}), (m {property:2}) CREATE (n)-[:TO]->(m)", 0) + assert has_n_result_row(cursor, "MATCH (n {property:2}), (m {property:3}) CREATE (n)-[:TO]->(m)", 0) + + results = execute_and_fetch_all(cursor, "MATCH (n)-[:TO]->(m)-[:TO]->(l) RETURN n,m,l") + assert len(results) == 1 + n, m, l = results[0] + assert n.properties["property"] == 1 + assert m.properties["property"] == 2 + assert l.properties["property"] == 3 + + +if __name__ == "__main__": + sys.exit(pytest.main([__file__, "-rA"])) diff --git a/tests/e2e/distributed_queries/workloads.yaml b/tests/e2e/distributed_queries/workloads.yaml index c4ba13500..741d6d939 100644 --- a/tests/e2e/distributed_queries/workloads.yaml +++ b/tests/e2e/distributed_queries/workloads.yaml @@ -12,6 +12,11 @@ workloads: args: ["distributed_queries/distributed_queries.py"] <<: *template_cluster + - name: "Distributed expand one" + binary: "tests/e2e/pytest_runner.sh" + args: ["distributed_queries/distributed_expand_one.py"] + <<: *template_cluster + - name: "Distributed unwind collect" binary: "tests/e2e/pytest_runner.sh" args: ["distributed_queries/unwind_collect.py"] From 4d2036249e8211732670f94b9ec1b42bb2ab5d76 Mon Sep 17 00:00:00 2001 From: jbajic Date: Fri, 4 Nov 2022 16:37:35 +0100 Subject: [PATCH 4/5] Fix e2e test --- tests/e2e/distributed_queries/CMakeLists.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/e2e/distributed_queries/CMakeLists.txt b/tests/e2e/distributed_queries/CMakeLists.txt index 9a2c48c63..455e6ad45 100644 --- a/tests/e2e/distributed_queries/CMakeLists.txt +++ b/tests/e2e/distributed_queries/CMakeLists.txt @@ -3,6 +3,7 @@ function(distributed_queries_e2e_python_files FILE_NAME) endfunction() distributed_queries_e2e_python_files(distributed_queries.py) +distributed_queries_e2e_python_files(distributed_expand_one.py) distributed_queries_e2e_python_files(unwind_collect.py) distributed_queries_e2e_python_files(order_by_and_limit.py) distributed_queries_e2e_python_files(distinct.py) From 88487e2513674e156230a0921191c095515cf6b0 Mon Sep 17 00:00:00 2001 From: jbajic Date: Mon, 7 Nov 2022 10:54:28 +0100 Subject: [PATCH 5/5] Extract vertex creation from FillUpSourceVertexSecondaryLabels --- src/storage/v3/shard_rsm.cpp | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/src/storage/v3/shard_rsm.cpp b/src/storage/v3/shard_rsm.cpp index 77f77f2e7..2fb2c8df8 100644 --- a/src/storage/v3/shard_rsm.cpp +++ b/src/storage/v3/shard_rsm.cpp @@ -12,11 +12,13 @@ #include #include #include +#include #include #include #include "parser/opencypher/parser.hpp" #include "query/v2/requests.hpp" +#include "storage/v2/vertex.hpp" #include "storage/v2/view.hpp" #include "storage/v3/bindings/ast/ast.hpp" #include "storage/v3/bindings/cypher_main_visitor.hpp" @@ -180,8 +182,8 @@ std::vector EvaluateVertexExpressions(DbAccessor &dba, const VertexA struct LocalError {}; -std::optional FillUpSourceVertexSecondaryLabels(const std::optional &v_acc, - const msgs::ExpandOneRequest &req) { +std::optional> FillUpSourceVertexSecondaryLabels(const std::optional &v_acc, + const msgs::ExpandOneRequest &req) { auto secondary_labels = v_acc->Labels(View::NEW); if (secondary_labels.HasError()) { spdlog::debug("Encountered an error while trying to get the secondary labels of a vertex. Transaction id: {}", @@ -190,13 +192,13 @@ std::optional FillUpSourceVertexSecondaryLabels(const std::optiona } auto &sec_labels = secondary_labels.GetValue(); - msgs::Vertex source_vertex; - source_vertex.labels.reserve(sec_labels.size()); + std::vector msgs_secondary_labels; + msgs_secondary_labels.reserve(sec_labels.size()); - std::transform(sec_labels.begin(), sec_labels.end(), std::back_inserter(source_vertex.labels), + std::transform(sec_labels.begin(), sec_labels.end(), std::back_inserter(msgs_secondary_labels), [](auto label_id) { return msgs::Label{.id = label_id}; }); - return source_vertex; + return msgs_secondary_labels; } std::optional> FillUpSourceVertexProperties(const std::optional &v_acc, @@ -324,11 +326,13 @@ std::optional GetExpandOneResult( const auto primary_key = ConvertPropertyVector(src_vertex.second); auto v_acc = acc.FindVertex(primary_key, View::NEW); - auto source_vertex = FillUpSourceVertexSecondaryLabels(v_acc, req); - if (!source_vertex) { + msgs::Vertex source_vertex = {.id = src_vertex}; + if (const auto maybe_secondary_labels = FillUpSourceVertexSecondaryLabels(v_acc, req); maybe_secondary_labels) { + source_vertex.labels = *maybe_secondary_labels; + } else { return std::nullopt; } - source_vertex->id = src_vertex; + std::optional> src_vertex_properties; src_vertex_properties = FillUpSourceVertexProperties(v_acc, req, storage::v3::View::NEW, schema); @@ -345,7 +349,7 @@ std::optional GetExpandOneResult( auto [in_edges, out_edges] = fill_up_connecting_edges.value(); msgs::ExpandOneResultRow result_row; - result_row.src_vertex = std::move(*source_vertex); + result_row.src_vertex = std::move(source_vertex); result_row.src_vertex_properties = std::move(*src_vertex_properties); static constexpr bool kInEdges = true; static constexpr bool kOutEdges = false;