diff --git a/src/storage/v3/shard_rsm.cpp b/src/storage/v3/shard_rsm.cpp index 9a124b250..2fb2c8df8 100644 --- a/src/storage/v3/shard_rsm.cpp +++ b/src/storage/v3/shard_rsm.cpp @@ -12,11 +12,13 @@ #include <algorithm> #include <functional> #include <iterator> +#include <optional> #include <unordered_set> #include <utility> #include "parser/opencypher/parser.hpp" #include "query/v2/requests.hpp" +#include "storage/v2/vertex.hpp" #include "storage/v2/view.hpp" #include "storage/v3/bindings/ast/ast.hpp" #include "storage/v3/bindings/cypher_main_visitor.hpp" @@ -180,8 +182,8 @@ std::vector<TypedValue> EvaluateVertexExpressions(DbAccessor &dba, const VertexA struct LocalError {}; -std::optional<msgs::Vertex> FillUpSourceVertex(const std::optional<VertexAccessor> &v_acc, - const msgs::ExpandOneRequest &req, msgs::VertexId src_vertex) { +std::optional<std::vector<msgs::Label>> FillUpSourceVertexSecondaryLabels(const std::optional<VertexAccessor> &v_acc, + const msgs::ExpandOneRequest &req) { auto secondary_labels = v_acc->Labels(View::NEW); if (secondary_labels.HasError()) { spdlog::debug("Encountered an error while trying to get the secondary labels of a vertex. Transaction id: {}", @@ -190,14 +192,13 @@ std::optional<msgs::Vertex> FillUpSourceVertex(const std::optional<VertexAccesso } auto &sec_labels = secondary_labels.GetValue(); - msgs::Vertex source_vertex; - source_vertex.id = src_vertex; - source_vertex.labels.reserve(sec_labels.size()); + std::vector<msgs::Label> msgs_secondary_labels; + msgs_secondary_labels.reserve(sec_labels.size()); - std::transform(sec_labels.begin(), sec_labels.end(), std::back_inserter(source_vertex.labels), + std::transform(sec_labels.begin(), sec_labels.end(), std::back_inserter(msgs_secondary_labels), [](auto label_id) { return msgs::Label{.id = label_id}; }); - return source_vertex; + return msgs_secondary_labels; } std::optional<std::map<PropertyId, Value>> FillUpSourceVertexProperties(const std::optional<VertexAccessor> &v_acc, @@ -322,13 +323,16 @@ std::optional<msgs::ExpandOneResultRow> GetExpandOneResult( const EdgeUniqunessFunction &maybe_filter_based_on_edge_uniquness, const EdgeFiller &edge_filler, const Schemas::Schema *schema) { /// Fill up source vertex - const auto primary_key = ConvertPropertyVector(std::move(src_vertex.second)); + const auto primary_key = ConvertPropertyVector(src_vertex.second); auto v_acc = acc.FindVertex(primary_key, View::NEW); - auto source_vertex = FillUpSourceVertex(v_acc, req, src_vertex); - if (!source_vertex) { + msgs::Vertex source_vertex = {.id = src_vertex}; + if (const auto maybe_secondary_labels = FillUpSourceVertexSecondaryLabels(v_acc, req); maybe_secondary_labels) { + source_vertex.labels = *maybe_secondary_labels; + } else { return std::nullopt; } + std::optional<std::map<PropertyId, Value>> src_vertex_properties; src_vertex_properties = FillUpSourceVertexProperties(v_acc, req, storage::v3::View::NEW, schema); @@ -345,7 +349,7 @@ std::optional<msgs::ExpandOneResultRow> GetExpandOneResult( auto [in_edges, out_edges] = fill_up_connecting_edges.value(); msgs::ExpandOneResultRow result_row; - result_row.src_vertex = std::move(*source_vertex); + result_row.src_vertex = std::move(source_vertex); result_row.src_vertex_properties = std::move(*src_vertex_properties); static constexpr bool kInEdges = true; static constexpr bool kOutEdges = false; diff --git a/tests/e2e/distributed_queries/CMakeLists.txt b/tests/e2e/distributed_queries/CMakeLists.txt index 9a2c48c63..455e6ad45 100644 --- a/tests/e2e/distributed_queries/CMakeLists.txt +++ b/tests/e2e/distributed_queries/CMakeLists.txt @@ -3,6 +3,7 @@ function(distributed_queries_e2e_python_files FILE_NAME) endfunction() distributed_queries_e2e_python_files(distributed_queries.py) +distributed_queries_e2e_python_files(distributed_expand_one.py) distributed_queries_e2e_python_files(unwind_collect.py) distributed_queries_e2e_python_files(order_by_and_limit.py) distributed_queries_e2e_python_files(distinct.py) diff --git a/tests/e2e/distributed_queries/common.py b/tests/e2e/distributed_queries/common.py index 3588a803c..b99061eb0 100644 --- a/tests/e2e/distributed_queries/common.py +++ b/tests/e2e/distributed_queries/common.py @@ -9,11 +9,11 @@ # by the Apache License, Version 2.0, included in the file # licenses/APL.txt. -import typing -import mgclient -import sys -import pytest import time +from typing import List, Optional + +import mgclient +import pytest @pytest.fixture(autouse=True) @@ -28,12 +28,12 @@ def connect(**kwargs) -> mgclient.Connection: return connection -def execute_and_fetch_all(cursor: mgclient.Cursor, query: str, params: dict = {}) -> typing.List[tuple]: - cursor.execute(query, params) +def execute_and_fetch_all(cursor: mgclient.Cursor, query: str, params: Optional[dict] = None) -> List[tuple]: + cursor.execute(query, params if params else {}) return cursor.fetchall() -def has_n_result_row(cursor: mgclient.Cursor, query: str, n: int): +def has_n_result_row(cursor: mgclient.Cursor, query: str, n: int) -> bool: results = execute_and_fetch_all(cursor, query) return len(results) == n diff --git a/tests/e2e/distributed_queries/distributed_expand_one.py b/tests/e2e/distributed_queries/distributed_expand_one.py new file mode 100644 index 000000000..9d3d81074 --- /dev/null +++ b/tests/e2e/distributed_queries/distributed_expand_one.py @@ -0,0 +1,37 @@ +# Copyright 2022 Memgraph Ltd. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source +# License, and you may not use this file except in compliance with the Business Source License. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0, included in the file +# licenses/APL.txt. + +import sys + +import pytest + +from common import connection, execute_and_fetch_all, has_n_result_row, wait_for_shard_manager_to_initialize + + +def test_sequenced_expand_one(connection): + wait_for_shard_manager_to_initialize() + cursor = connection.cursor() + + for i in range(1, 4): + assert has_n_result_row(cursor, f"CREATE (:label {{property:{i}}})", 0), f"Failed creating node" + assert has_n_result_row(cursor, "MATCH (n {property:1}), (m {property:2}) CREATE (n)-[:TO]->(m)", 0) + assert has_n_result_row(cursor, "MATCH (n {property:2}), (m {property:3}) CREATE (n)-[:TO]->(m)", 0) + + results = execute_and_fetch_all(cursor, "MATCH (n)-[:TO]->(m)-[:TO]->(l) RETURN n,m,l") + assert len(results) == 1 + n, m, l = results[0] + assert n.properties["property"] == 1 + assert m.properties["property"] == 2 + assert l.properties["property"] == 3 + + +if __name__ == "__main__": + sys.exit(pytest.main([__file__, "-rA"])) diff --git a/tests/e2e/distributed_queries/workloads.yaml b/tests/e2e/distributed_queries/workloads.yaml index c4ba13500..741d6d939 100644 --- a/tests/e2e/distributed_queries/workloads.yaml +++ b/tests/e2e/distributed_queries/workloads.yaml @@ -12,6 +12,11 @@ workloads: args: ["distributed_queries/distributed_queries.py"] <<: *template_cluster + - name: "Distributed expand one" + binary: "tests/e2e/pytest_runner.sh" + args: ["distributed_queries/distributed_expand_one.py"] + <<: *template_cluster + - name: "Distributed unwind collect" binary: "tests/e2e/pytest_runner.sh" args: ["distributed_queries/unwind_collect.py"]