Merge pull request #638 from memgraph/T1148-MG-fix-expand-one-source-vertex
Fix setting the primary key of source vertex in ExpandOne
This commit is contained in:
commit
94b66a4e81
@ -12,11 +12,13 @@
|
||||
#include <algorithm>
|
||||
#include <functional>
|
||||
#include <iterator>
|
||||
#include <optional>
|
||||
#include <unordered_set>
|
||||
#include <utility>
|
||||
|
||||
#include "parser/opencypher/parser.hpp"
|
||||
#include "query/v2/requests.hpp"
|
||||
#include "storage/v2/vertex.hpp"
|
||||
#include "storage/v2/view.hpp"
|
||||
#include "storage/v3/bindings/ast/ast.hpp"
|
||||
#include "storage/v3/bindings/cypher_main_visitor.hpp"
|
||||
@ -180,8 +182,8 @@ std::vector<TypedValue> EvaluateVertexExpressions(DbAccessor &dba, const VertexA
|
||||
|
||||
struct LocalError {};
|
||||
|
||||
std::optional<msgs::Vertex> FillUpSourceVertex(const std::optional<VertexAccessor> &v_acc,
|
||||
const msgs::ExpandOneRequest &req, msgs::VertexId src_vertex) {
|
||||
std::optional<std::vector<msgs::Label>> FillUpSourceVertexSecondaryLabels(const std::optional<VertexAccessor> &v_acc,
|
||||
const msgs::ExpandOneRequest &req) {
|
||||
auto secondary_labels = v_acc->Labels(View::NEW);
|
||||
if (secondary_labels.HasError()) {
|
||||
spdlog::debug("Encountered an error while trying to get the secondary labels of a vertex. Transaction id: {}",
|
||||
@ -190,14 +192,13 @@ std::optional<msgs::Vertex> FillUpSourceVertex(const std::optional<VertexAccesso
|
||||
}
|
||||
|
||||
auto &sec_labels = secondary_labels.GetValue();
|
||||
msgs::Vertex source_vertex;
|
||||
source_vertex.id = src_vertex;
|
||||
source_vertex.labels.reserve(sec_labels.size());
|
||||
std::vector<msgs::Label> msgs_secondary_labels;
|
||||
msgs_secondary_labels.reserve(sec_labels.size());
|
||||
|
||||
std::transform(sec_labels.begin(), sec_labels.end(), std::back_inserter(source_vertex.labels),
|
||||
std::transform(sec_labels.begin(), sec_labels.end(), std::back_inserter(msgs_secondary_labels),
|
||||
[](auto label_id) { return msgs::Label{.id = label_id}; });
|
||||
|
||||
return source_vertex;
|
||||
return msgs_secondary_labels;
|
||||
}
|
||||
|
||||
std::optional<std::map<PropertyId, Value>> FillUpSourceVertexProperties(const std::optional<VertexAccessor> &v_acc,
|
||||
@ -322,13 +323,16 @@ std::optional<msgs::ExpandOneResultRow> GetExpandOneResult(
|
||||
const EdgeUniqunessFunction &maybe_filter_based_on_edge_uniquness, const EdgeFiller &edge_filler,
|
||||
const Schemas::Schema *schema) {
|
||||
/// Fill up source vertex
|
||||
const auto primary_key = ConvertPropertyVector(std::move(src_vertex.second));
|
||||
const auto primary_key = ConvertPropertyVector(src_vertex.second);
|
||||
auto v_acc = acc.FindVertex(primary_key, View::NEW);
|
||||
|
||||
auto source_vertex = FillUpSourceVertex(v_acc, req, src_vertex);
|
||||
if (!source_vertex) {
|
||||
msgs::Vertex source_vertex = {.id = src_vertex};
|
||||
if (const auto maybe_secondary_labels = FillUpSourceVertexSecondaryLabels(v_acc, req); maybe_secondary_labels) {
|
||||
source_vertex.labels = *maybe_secondary_labels;
|
||||
} else {
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
std::optional<std::map<PropertyId, Value>> src_vertex_properties;
|
||||
src_vertex_properties = FillUpSourceVertexProperties(v_acc, req, storage::v3::View::NEW, schema);
|
||||
|
||||
@ -345,7 +349,7 @@ std::optional<msgs::ExpandOneResultRow> GetExpandOneResult(
|
||||
auto [in_edges, out_edges] = fill_up_connecting_edges.value();
|
||||
|
||||
msgs::ExpandOneResultRow result_row;
|
||||
result_row.src_vertex = std::move(*source_vertex);
|
||||
result_row.src_vertex = std::move(source_vertex);
|
||||
result_row.src_vertex_properties = std::move(*src_vertex_properties);
|
||||
static constexpr bool kInEdges = true;
|
||||
static constexpr bool kOutEdges = false;
|
||||
|
@ -3,6 +3,7 @@ function(distributed_queries_e2e_python_files FILE_NAME)
|
||||
endfunction()
|
||||
|
||||
distributed_queries_e2e_python_files(distributed_queries.py)
|
||||
distributed_queries_e2e_python_files(distributed_expand_one.py)
|
||||
distributed_queries_e2e_python_files(unwind_collect.py)
|
||||
distributed_queries_e2e_python_files(order_by_and_limit.py)
|
||||
distributed_queries_e2e_python_files(distinct.py)
|
||||
|
@ -9,11 +9,11 @@
|
||||
# by the Apache License, Version 2.0, included in the file
|
||||
# licenses/APL.txt.
|
||||
|
||||
import typing
|
||||
import mgclient
|
||||
import sys
|
||||
import pytest
|
||||
import time
|
||||
from typing import List, Optional
|
||||
|
||||
import mgclient
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
@ -28,12 +28,12 @@ def connect(**kwargs) -> mgclient.Connection:
|
||||
return connection
|
||||
|
||||
|
||||
def execute_and_fetch_all(cursor: mgclient.Cursor, query: str, params: dict = {}) -> typing.List[tuple]:
|
||||
cursor.execute(query, params)
|
||||
def execute_and_fetch_all(cursor: mgclient.Cursor, query: str, params: Optional[dict] = None) -> List[tuple]:
|
||||
cursor.execute(query, params if params else {})
|
||||
return cursor.fetchall()
|
||||
|
||||
|
||||
def has_n_result_row(cursor: mgclient.Cursor, query: str, n: int):
|
||||
def has_n_result_row(cursor: mgclient.Cursor, query: str, n: int) -> bool:
|
||||
results = execute_and_fetch_all(cursor, query)
|
||||
return len(results) == n
|
||||
|
||||
|
37
tests/e2e/distributed_queries/distributed_expand_one.py
Normal file
37
tests/e2e/distributed_queries/distributed_expand_one.py
Normal file
@ -0,0 +1,37 @@
|
||||
# Copyright 2022 Memgraph Ltd.
|
||||
#
|
||||
# Use of this software is governed by the Business Source License
|
||||
# included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
# License, and you may not use this file except in compliance with the Business Source License.
|
||||
#
|
||||
# As of the Change Date specified in that file, in accordance with
|
||||
# the Business Source License, use of this software will be governed
|
||||
# by the Apache License, Version 2.0, included in the file
|
||||
# licenses/APL.txt.
|
||||
|
||||
import sys
|
||||
|
||||
import pytest
|
||||
|
||||
from common import connection, execute_and_fetch_all, has_n_result_row, wait_for_shard_manager_to_initialize
|
||||
|
||||
|
||||
def test_sequenced_expand_one(connection):
|
||||
wait_for_shard_manager_to_initialize()
|
||||
cursor = connection.cursor()
|
||||
|
||||
for i in range(1, 4):
|
||||
assert has_n_result_row(cursor, f"CREATE (:label {{property:{i}}})", 0), f"Failed creating node"
|
||||
assert has_n_result_row(cursor, "MATCH (n {property:1}), (m {property:2}) CREATE (n)-[:TO]->(m)", 0)
|
||||
assert has_n_result_row(cursor, "MATCH (n {property:2}), (m {property:3}) CREATE (n)-[:TO]->(m)", 0)
|
||||
|
||||
results = execute_and_fetch_all(cursor, "MATCH (n)-[:TO]->(m)-[:TO]->(l) RETURN n,m,l")
|
||||
assert len(results) == 1
|
||||
n, m, l = results[0]
|
||||
assert n.properties["property"] == 1
|
||||
assert m.properties["property"] == 2
|
||||
assert l.properties["property"] == 3
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(pytest.main([__file__, "-rA"]))
|
@ -12,6 +12,11 @@ workloads:
|
||||
args: ["distributed_queries/distributed_queries.py"]
|
||||
<<: *template_cluster
|
||||
|
||||
- name: "Distributed expand one"
|
||||
binary: "tests/e2e/pytest_runner.sh"
|
||||
args: ["distributed_queries/distributed_expand_one.py"]
|
||||
<<: *template_cluster
|
||||
|
||||
- name: "Distributed unwind collect"
|
||||
binary: "tests/e2e/pytest_runner.sh"
|
||||
args: ["distributed_queries/unwind_collect.py"]
|
||||
|
Loading…
Reference in New Issue
Block a user