From 28ba87266806b77abd94e512c8416717e55a8a35 Mon Sep 17 00:00:00 2001 From: Marin Tomic Date: Wed, 29 Aug 2018 10:26:27 +0200 Subject: [PATCH] Implement single node two-sided BFS Reviewers: mculinovic, teon.banek, ipaljak, buda, msantl Reviewed By: teon.banek Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1567 --- src/query/interpret/frame.hpp | 5 + src/query/plan/distributed_ops.cpp | 6 +- src/query/plan/operator.cpp | 345 ++++++++++++++--- src/query/plan/operator.lcp | 1 - src/utils/algorithm.hpp | 7 + .../clients/bfs_pokec_client.cpp | 7 +- tests/macro_benchmark/run_bfs_pokec | 1 - tests/unit/query_plan_common.hpp | 14 + tests/unit/query_plan_match_filter_return.cpp | 352 ++++++++++++++---- 9 files changed, 602 insertions(+), 136 deletions(-) diff --git a/src/query/interpret/frame.hpp b/src/query/interpret/frame.hpp index 02688d8e9..d53ca7e9d 100644 --- a/src/query/interpret/frame.hpp +++ b/src/query/interpret/frame.hpp @@ -18,6 +18,11 @@ class Frame { return elems_[symbol.position()]; } + TypedValue &at(const Symbol &symbol) { return elems_.at(symbol.position()); } + const TypedValue &at(const Symbol &symbol) const { + return elems_.at(symbol.position()); + } + auto &elems() { return elems_; } private: diff --git a/src/query/plan/distributed_ops.cpp b/src/query/plan/distributed_ops.cpp index 0753212de..d077c94cb 100644 --- a/src/query/plan/distributed_ops.cpp +++ b/src/query/plan/distributed_ops.cpp @@ -841,7 +841,7 @@ class DistributedExpandBfsCursor : public query::plan::Cursor { upper_bound_ = self_.upper_bound() ? EvaluateInt(&evaluator, self_.upper_bound(), "Max depth in breadth-first expansion") - : std::numeric_limits::max(); + : std::numeric_limits::max(); skip_rest_ = false; if (upper_bound_ < 1) { @@ -870,8 +870,8 @@ class DistributedExpandBfsCursor : public query::plan::Cursor { // Depth bounds. Calculated on each pull from the input, the initial value // is irrelevant. - int lower_bound_{-1}; - int upper_bound_{-1}; + int64_t lower_bound_{-1}; + int64_t upper_bound_{-1}; // When set to true, expansion is restarted from a new source. bool skip_rest_{false}; diff --git a/src/query/plan/operator.cpp b/src/query/plan/operator.cpp index c7be4eb8d..35c3618d2 100644 --- a/src/query/plan/operator.cpp +++ b/src/query/plan/operator.cpp @@ -1019,14 +1019,247 @@ class ExpandVariableCursor : public Cursor { } }; -class ExpandBfsCursor : public query::plan::Cursor { +class STShortestPathCursor : public query::plan::Cursor { public: - ExpandBfsCursor(const ExpandVariable &self, database::GraphDbAccessor &db) - : self_(self), input_cursor_(self_.input_->MakeCursor(db)) {} + STShortestPathCursor(const ExpandVariable &self, + database::GraphDbAccessor &dba) + : self_(self), input_cursor_(self_.input()->MakeCursor(dba)) { + CHECK(self_.graph_view() == GraphView::OLD) + << "ExpandVariable should only be planned with GraphView::OLD"; + CHECK(self_.existing_node()) << "s-t shortest path algorithm should only " + "be used when `existing_node` flag is " + "set!"; + } bool Pull(Frame &frame, Context &context) override { - // evaluator for the filtering condition - ExpressionEvaluator evaluator(frame, &context, self_.graph_view_); + ExpressionEvaluator evaluator(frame, &context, GraphView::OLD); + while (input_cursor_->Pull(frame, context)) { + auto source_tv = frame[self_.input_symbol()]; + auto sink_tv = frame[self_.node_symbol()]; + + // It is possible that source or sink vertex is Null due to optional + // matching. + if (source_tv.IsNull() || sink_tv.IsNull()) continue; + + auto source = source_tv.ValueVertex(); + auto sink = sink_tv.ValueVertex(); + + int64_t lower_bound = + self_.lower_bound() + ? EvaluateInt(&evaluator, self_.lower_bound(), + "Min depth in breadth-first expansion") + : 1; + int64_t upper_bound = + self_.upper_bound() + ? EvaluateInt(&evaluator, self_.upper_bound(), + "Max depth in breadth-first expansion") + : std::numeric_limits::max(); + + if (upper_bound < 1 || lower_bound > upper_bound) continue; + + if (FindPath(source, sink, lower_bound, upper_bound, &frame, + &evaluator)) { + return true; + } + } + return false; + } + + void Reset() override { input_cursor_->Reset(); } + + private: + const ExpandVariable &self_; + std::unique_ptr input_cursor_; + + using VertexEdgeMapT = + std::unordered_map>; + + void ReconstructPath(const VertexAccessor &midpoint, + const VertexEdgeMapT &in_edge, + const VertexEdgeMapT &out_edge, Frame *frame) { + std::vector result; + auto last_vertex = midpoint; + while (true) { + const auto &last_edge = in_edge.at(last_vertex); + if (!last_edge) break; + last_vertex = + last_edge->from_is(last_vertex) ? last_edge->to() : last_edge->from(); + result.emplace_back(*last_edge); + } + std::reverse(result.begin(), result.end()); + last_vertex = midpoint; + while (true) { + const auto &last_edge = out_edge.at(last_vertex); + if (!last_edge) break; + last_vertex = + last_edge->from_is(last_vertex) ? last_edge->to() : last_edge->from(); + result.emplace_back(*last_edge); + } + frame->at(self_.edge_symbol()) = std::move(result); + } + + bool ShouldExpand(const VertexAccessor &vertex, const EdgeAccessor &edge, + Frame *frame, ExpressionEvaluator *evaluator) { + if (!self_.filter_lambda().expression) return true; + + frame->at(self_.filter_lambda().inner_node_symbol) = vertex; + frame->at(self_.filter_lambda().inner_edge_symbol) = edge; + + TypedValue result = self_.filter_lambda().expression->Accept(*evaluator); + if (result.IsNull()) return false; + if (result.IsBool()) return result.ValueBool(); + + throw QueryRuntimeException( + "Expansion condition must evaluate to boolean or null"); + } + + bool FindPath(const VertexAccessor &source, const VertexAccessor &sink, + int64_t lower_bound, int64_t upper_bound, Frame *frame, + ExpressionEvaluator *evaluator) { + using utils::Contains; + + if (source == sink) return false; + + // We expand from both directions, both from the source and the sink. + // Expansions meet at the middle of the path if it exists. This should + // perform better for real-world like graphs where the expansion front + // grows exponentially, effectively reducing the exponent by half. + + // Holds vertices at the current level of expansion from the source + // (sink). + std::vector source_frontier; + std::vector sink_frontier; + + // Holds vertices we can expand to from `source_frontier` + // (`sink_frontier`). + std::vector source_next; + std::vector sink_next; + + // Maps each vertex we visited expanding from the source (sink) to the + // edge used. Necessary for path reconstruction. + VertexEdgeMapT in_edge; + VertexEdgeMapT out_edge; + + size_t current_length = 0; + + source_frontier.emplace_back(source); + in_edge[source] = std::experimental::nullopt; + sink_frontier.emplace_back(sink); + out_edge[sink] = std::experimental::nullopt; + + while (true) { + // Top-down step (expansion from the source). + ++current_length; + if (current_length > upper_bound) return false; + + for (const auto &vertex : source_frontier) { + if (self_.direction() != EdgeAtom::Direction::IN) { + for (const auto &edge : vertex.out(&self_.edge_types())) { + if (ShouldExpand(edge.to(), edge, frame, evaluator) && + !Contains(in_edge, edge.to())) { + in_edge.emplace(edge.to(), edge); + if (Contains(out_edge, edge.to())) { + if (current_length >= lower_bound) { + ReconstructPath(edge.to(), in_edge, out_edge, frame); + return true; + } else { + return false; + } + } + source_next.push_back(edge.to()); + } + } + } + if (self_.direction() != EdgeAtom::Direction::OUT) { + for (const auto &edge : vertex.in(&self_.edge_types())) { + if (ShouldExpand(edge.from(), edge, frame, evaluator) && + !Contains(in_edge, edge.from())) { + in_edge.emplace(edge.from(), edge); + if (Contains(out_edge, edge.from())) { + if (current_length >= lower_bound) { + ReconstructPath(edge.from(), in_edge, out_edge, frame); + return true; + } else { + return false; + } + } + source_next.push_back(edge.from()); + } + } + } + } + + if (source_next.empty()) return false; + source_frontier.clear(); + std::swap(source_frontier, source_next); + + // Bottom-up step (expansion from the sink). + ++current_length; + if (current_length > upper_bound) return false; + + // When expanding from the sink we have to be careful which edge + // endpoint we pass to `should_expand`, because everything is + // reversed. + for (const auto &vertex : sink_frontier) { + if (self_.direction() != EdgeAtom::Direction::OUT) { + for (const auto &edge : vertex.out(&self_.edge_types())) { + if (ShouldExpand(vertex, edge, frame, evaluator) && + !Contains(out_edge, edge.to())) { + out_edge.emplace(edge.to(), edge); + if (Contains(in_edge, edge.to())) { + if (current_length >= lower_bound) { + ReconstructPath(edge.to(), in_edge, out_edge, frame); + return true; + } else { + return false; + } + } + sink_next.push_back(edge.to()); + } + } + } + if (self_.direction() != EdgeAtom::Direction::IN) { + for (const auto &edge : vertex.in(&self_.edge_types())) { + if (ShouldExpand(vertex, edge, frame, evaluator) && + !Contains(out_edge, edge.from())) { + out_edge.emplace(edge.from(), edge); + if (Contains(in_edge, edge.from())) { + if (current_length >= lower_bound) { + ReconstructPath(edge.from(), in_edge, out_edge, frame); + return true; + } else { + return false; + } + } + sink_next.push_back(edge.from()); + } + } + } + } + + if (sink_next.empty()) return false; + sink_frontier.clear(); + std::swap(sink_frontier, sink_next); + } + } +}; + +class SingleSourceShortestPathCursor : public query::plan::Cursor { + public: + SingleSourceShortestPathCursor(const ExpandVariable &self, + database::GraphDbAccessor &db) + : self_(self), input_cursor_(self_.input()->MakeCursor(db)) { + CHECK(self_.graph_view() == GraphView::OLD) + << "ExpandVariable should only be planned with GraphView::OLD"; + CHECK(!self_.existing_node()) << "Single source shortest path algorithm " + "should not be used when `existing_node` " + "flag is set, s-t shortest path algorithm " + "should be used instead!"; + } + + bool Pull(Frame &frame, Context &context) override { + ExpressionEvaluator evaluator(frame, &context, GraphView::OLD); // for the given (edge, vertex) pair checks if they satisfy the // "where" condition. if so, places them in the to_visit_ structure. @@ -1035,14 +1268,11 @@ class ExpandBfsCursor : public query::plan::Cursor { // if we already processed the given vertex it doesn't get expanded if (processed_.find(vertex) != processed_.end()) return; - SwitchAccessor(edge, self_.graph_view_); - SwitchAccessor(vertex, self_.graph_view_); + frame[self_.filter_lambda().inner_edge_symbol] = edge; + frame[self_.filter_lambda().inner_node_symbol] = vertex; - frame[self_.filter_lambda_.inner_edge_symbol] = edge; - frame[self_.filter_lambda_.inner_node_symbol] = vertex; - - if (self_.filter_lambda_.expression) { - TypedValue result = self_.filter_lambda_.expression->Accept(evaluator); + if (self_.filter_lambda().expression) { + TypedValue result = self_.filter_lambda().expression->Accept(evaluator); switch (result.type()) { case TypedValue::Type::Null: return; @@ -1062,12 +1292,12 @@ class ExpandBfsCursor : public query::plan::Cursor { // from the given vertex. skips expansions that don't satisfy // the "where" condition. auto expand_from_vertex = [this, &expand_pair](VertexAccessor &vertex) { - if (self_.direction_ != EdgeAtom::Direction::IN) { - for (const EdgeAccessor &edge : vertex.out(&self_.edge_types_)) + if (self_.direction() != EdgeAtom::Direction::IN) { + for (const EdgeAccessor &edge : vertex.out(&self_.edge_types())) expand_pair(edge, edge.to()); } - if (self_.direction_ != EdgeAtom::Direction::OUT) { - for (const EdgeAccessor &edge : vertex.in(&self_.edge_types_)) + if (self_.direction() != EdgeAtom::Direction::OUT) { + for (const EdgeAccessor &edge : vertex.in(&self_.edge_types())) expand_pair(edge, edge.from()); } }; @@ -1079,28 +1309,26 @@ class ExpandBfsCursor : public query::plan::Cursor { // if current is still empty, it means both are empty, so pull from // input - if (skip_rest_ || to_visit_current_.empty()) { + if (to_visit_current_.empty()) { if (!input_cursor_->Pull(frame, context)) return false; to_visit_current_.clear(); to_visit_next_.clear(); processed_.clear(); - auto vertex_value = frame[self_.input_symbol_]; + auto vertex_value = frame[self_.input_symbol()]; // it is possible that the vertex is Null due to optional matching if (vertex_value.IsNull()) continue; auto vertex = vertex_value.Value(); - SwitchAccessor(vertex, self_.graph_view_); processed_.emplace(vertex, std::experimental::nullopt); expand_from_vertex(vertex); - lower_bound_ = self_.lower_bound_ - ? EvaluateInt(&evaluator, self_.lower_bound_, + lower_bound_ = self_.lower_bound() + ? EvaluateInt(&evaluator, self_.lower_bound(), "Min depth in breadth-first expansion") : 1; - upper_bound_ = self_.upper_bound_ - ? EvaluateInt(&evaluator, self_.upper_bound_, + upper_bound_ = self_.upper_bound() + ? EvaluateInt(&evaluator, self_.upper_bound(), "Max depth in breadth-first expansion") - : std::numeric_limits::max(); - skip_rest_ = false; + : std::numeric_limits::max(); if (upper_bound_ < 1) throw QueryRuntimeException( "Max depth in breadth-first expansion must be greater then " @@ -1112,8 +1340,8 @@ class ExpandBfsCursor : public query::plan::Cursor { // take the next expansion from the queue std::pair expansion = - to_visit_current_.front(); - to_visit_current_.pop_front(); + to_visit_current_.back(); + to_visit_current_.pop_back(); // create the frame value for the edges std::vector edge_list{expansion.first}; @@ -1130,25 +1358,16 @@ class ExpandBfsCursor : public query::plan::Cursor { } // expand only if what we've just expanded is less then max depth - if (static_cast(edge_list.size()) < upper_bound_) + if (static_cast(edge_list.size()) < upper_bound_) expand_from_vertex(expansion.second); if (static_cast(edge_list.size()) < lower_bound_) continue; - // place destination node on the frame, handle existence flag - if (self_.existing_node_) { - TypedValue &node = frame[self_.node_symbol_]; - // due to optional matching the existing node could be null - if (node.IsNull() || (node != expansion.second).Value()) continue; - // there is no point in traversing the rest of the graph because bfs - // can find only one path to a certain node - skip_rest_ = true; - } else - frame[self_.node_symbol_] = expansion.second; + frame[self_.node_symbol()] = expansion.second; // place edges on the frame in the correct order std::reverse(edge_list.begin(), edge_list.end()); - frame[self_.edge_symbol_] = std::move(edge_list); + frame[self_.edge_symbol()] = std::move(edge_list); return true; } @@ -1164,13 +1383,10 @@ class ExpandBfsCursor : public query::plan::Cursor { const ExpandVariable &self_; const std::unique_ptr input_cursor_; - // Depth bounds. Calculated on each pull from the input, the initial value is - // irrelevant. - int lower_bound_{-1}; - int upper_bound_{-1}; - - // when set to true, expansion is restarted from a new source - bool skip_rest_{false}; + // Depth bounds. Calculated on each pull from the input, the initial value + // is irrelevant. + int64_t lower_bound_{-1}; + int64_t upper_bound_{-1}; // maps vertices to the edge they got expanded from. it is an optional // edge because the root does not get expanded from anything. @@ -1178,8 +1394,8 @@ class ExpandBfsCursor : public query::plan::Cursor { std::unordered_map> processed_; // edge/vertex pairs we have yet to visit, for current and next depth - std::deque> to_visit_current_; - std::deque> to_visit_next_; + std::vector> to_visit_current_; + std::vector> to_visit_next_; }; class ExpandWeightedShortestPathCursor : public query::plan::Cursor { @@ -1195,7 +1411,8 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor { }; // For the given (edge, vertex, weight, depth) tuple checks if they - // satisfy the "where" condition. if so, places them in the priority queue. + // satisfy the "where" condition. if so, places them in the priority + // queue. auto expand_pair = [this, &evaluator, &frame, &create_state]( EdgeAccessor edge, VertexAccessor vertex, double weight, int depth) { @@ -1269,7 +1486,7 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor { "Max depth in weighted shortest path expansion"); upper_bound_set_ = true; } else { - upper_bound_ = std::numeric_limits::max(); + upper_bound_ = std::numeric_limits::max(); upper_bound_set_ = false; } if (upper_bound_ < 1) @@ -1370,7 +1587,7 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor { const std::unique_ptr input_cursor_; // Upper bound on the path length. - int upper_bound_{-1}; + int64_t upper_bound_{-1}; bool upper_bound_set_{false}; struct WspStateHash { @@ -1418,12 +1635,20 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor { std::unique_ptr ExpandVariable::MakeCursor( database::GraphDbAccessor &db) const { - if (type_ == EdgeAtom::Type::BREADTH_FIRST) { - return std::make_unique(*this, db); - } else if (type_ == EdgeAtom::Type::WEIGHTED_SHORTEST_PATH) { - return std::make_unique(*this, db); - } else { - return std::make_unique(*this, db); + switch (type_) { + case EdgeAtom::Type::BREADTH_FIRST: + if (existing_node_) { + return std::make_unique(*this, db); + } else { + return std::make_unique(*this, db); + } + case EdgeAtom::Type::DEPTH_FIRST: + return std::make_unique(*this, db); + case EdgeAtom::Type::WEIGHTED_SHORTEST_PATH: + return std::make_unique(*this, db); + case EdgeAtom::Type::SINGLE: + LOG(FATAL) + << "ExpandVariable should not be planned for a single expansion!"; } } @@ -2400,9 +2625,9 @@ Skip::SkipCursor::SkipCursor(const Skip &self, database::GraphDbAccessor &db) bool Skip::SkipCursor::Pull(Frame &frame, Context &context) { while (input_cursor_->Pull(frame, context)) { if (to_skip_ == -1) { - // First successful pull from the input, evaluate the skip expression. The - // skip expression doesn't contain identifiers so graph view parameter is - // not important. + // First successful pull from the input, evaluate the skip expression. + // The skip expression doesn't contain identifiers so graph view + // parameter is not important. ExpressionEvaluator evaluator(frame, &context, GraphView::OLD); TypedValue to_skip = self_.expression_->Accept(evaluator); if (to_skip.type() != TypedValue::Type::Int) diff --git a/src/query/plan/operator.lcp b/src/query/plan/operator.lcp index ce9f7da11..01733a549 100644 --- a/src/query/plan/operator.lcp +++ b/src/query/plan/operator.lcp @@ -937,7 +937,6 @@ pulled.") // it's edges_ and edges_it_ are decltyped using a helper function // that should be inaccessible (private class function won't compile) friend class ExpandVariableCursor; - friend class ExpandBfsCursor; friend class ExpandWeightedShortestPathCursor; ExpandVariable() {} diff --git a/src/utils/algorithm.hpp b/src/utils/algorithm.hpp index 1c1910236..1c493e49f 100644 --- a/src/utils/algorithm.hpp +++ b/src/utils/algorithm.hpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include @@ -98,6 +99,12 @@ inline bool Contains(const std::unordered_set &iterable, return iterable.find(element) != iterable.end(); } +template +inline bool Contains(const std::unordered_map &iterable, + const TKey &key) { + return iterable.find(key) != iterable.end(); +} + /** * Returns `true` if the given iterable contains the given element. * diff --git a/tests/macro_benchmark/clients/bfs_pokec_client.cpp b/tests/macro_benchmark/clients/bfs_pokec_client.cpp index cae677cab..8c54f2085 100644 --- a/tests/macro_benchmark/clients/bfs_pokec_client.cpp +++ b/tests/macro_benchmark/clients/bfs_pokec_client.cpp @@ -51,8 +51,9 @@ class BfsPokecClient : public TestClient { } if (FLAGS_db == "memgraph") { auto result = Execute( - "MATCH p = (n:User {id: $start})-[*bfs..15]->(m:User {id: $end}) " - "RETURN nodes(p) AS path LIMIT 1", + "MATCH (n:User {id: $start}), (m:User {id: $end}), " + "p = (n)-[*bfs..15]->(m) " + "RETURN extract(n in nodes(p) | n.id) AS path", {{"start", start}, {"end", end}}, "Bfs"); CHECK(result) << "Read-only query should not fail!"; } else if (FLAGS_db == "neo4j") { @@ -70,7 +71,7 @@ class BfsPokecClient : public TestClient { if (FLAGS_db == "memgraph") { auto result = Execute( "MATCH p = (n:User {id: $start})-[*bfs..15]->(m:User) WHERE m != n " - "RETURN nodes(p) AS path", + "RETURN extract(n in nodes(p) | n.id) AS path", {{"start", start}}, "Bfs"); CHECK(result) << "Read-only query should not fail!"; } else { diff --git a/tests/macro_benchmark/run_bfs_pokec b/tests/macro_benchmark/run_bfs_pokec index 8b0a2371a..9cb18b219 100755 --- a/tests/macro_benchmark/run_bfs_pokec +++ b/tests/macro_benchmark/run_bfs_pokec @@ -20,4 +20,3 @@ mv .harness_summary ${script_dir}/.results/bfs_pokec/memgraph_bfs_2.summary ./harness LongRunningSuite NeoRunner --groups bfs_pokec --workload without_destination_node mv .harness_summary ${script_dir}/.results/bfs_pokec/neo4j_bfs_2.summary - diff --git a/tests/unit/query_plan_common.hpp b/tests/unit/query_plan_common.hpp index f0731ce68..2712b60a3 100644 --- a/tests/unit/query_plan_common.hpp +++ b/tests/unit/query_plan_common.hpp @@ -175,6 +175,20 @@ ExpandTuple MakeExpand(AstStorage &storage, SymbolTable &symbol_table, return ExpandTuple{edge, edge_sym, node, node_sym, op}; } +struct UnwindTuple { + Symbol sym_; + std::shared_ptr op_; +}; + +UnwindTuple MakeUnwind(SymbolTable &symbol_table, + const std::string &symbol_name, + std::shared_ptr input, + Expression *input_expression) { + auto sym = symbol_table.CreateSymbol(symbol_name, true); + auto op = std::make_shared(input, input_expression, sym); + return UnwindTuple{sym, op}; +} + template auto CountIterable(TIterable iterable) { return std::distance(iterable.begin(), iterable.end()); diff --git a/tests/unit/query_plan_match_filter_return.cpp b/tests/unit/query_plan_match_filter_return.cpp index def0cdb3a..0a4c916d0 100644 --- a/tests/unit/query_plan_match_filter_return.cpp +++ b/tests/unit/query_plan_match_filter_return.cpp @@ -7,6 +7,8 @@ #include #include "cppitertools/enumerate.hpp" +#include "cppitertools/product.hpp" +#include "cppitertools/range.hpp" #include "cppitertools/repeat.hpp" #include "gmock/gmock.h" #include "gtest/gtest.h" @@ -829,6 +831,266 @@ struct hash> { }; } // namespace std +std::vector> FloydWarshall( + int num_vertices, const std::vector> &edges, + EdgeAtom::Direction dir) { + auto has_edge = [&](int u, int v) -> bool { + bool res = false; + if (dir != EdgeAtom::Direction::IN) + res |= utils::Contains(edges, std::make_pair(u, v)); + if (dir != EdgeAtom::Direction::OUT) + res |= utils::Contains(edges, std::make_pair(v, u)); + return res; + }; + + int inf = std::numeric_limits::max(); + std::vector> dist(num_vertices, + std::vector(num_vertices, inf)); + + for (int i = 0; i < num_vertices; ++i) + for (int j = 0; j < num_vertices; ++j) + if (has_edge(i, j)) dist[i][j] = 1; + for (int i = 0; i < num_vertices; ++i) dist[i][i] = 0; + + for (int k = 0; k < num_vertices; ++k) { + for (int i = 0; i < num_vertices; ++i) { + for (int j = 0; j < num_vertices; ++j) { + if (dist[i][k] == inf || dist[k][j] == inf) continue; + dist[i][j] = std::min(dist[i][j], dist[i][k] + dist[k][j]); + } + } + } + + for (int i = 0; i < num_vertices; ++i) + for (int j = 0; j < num_vertices; ++j) + if (dist[i][j] == inf) dist[i][j] = -1; + + return dist; +} + +class STShortestPathTest : public ::testing::Test { + protected: + STShortestPathTest() : db(), dba_ptr(db.Access()), dba(*dba_ptr) {} + + void SetUp() { + for (int i = 0; i < NUM_VERTICES; ++i) { + vertices.emplace_back(dba.InsertVertex()); + vertices[i].PropsSet(dba.Property("id"), i); + } + for (auto edge : EDGES) { + edges.emplace_back(dba.InsertEdge( + vertices[edge.first], vertices[edge.second], dba.EdgeType("Edge"))); + edges.back().PropsSet(dba.Property("id"), + fmt::format("{}-{}", edge.first, edge.second)); + } + + dba.AdvanceCommand(); + + ASSERT_EQ(dba.VerticesCount(), NUM_VERTICES); + ASSERT_EQ(dba.EdgesCount(), EDGES.size()); + } + + std::vector> ShortestPaths( + std::shared_ptr input_cursor, + Symbol source_symbol, Symbol sink_symbol, EdgeAtom::Direction dir, + Expression *lower_bound = nullptr, Expression *upper_bound = nullptr, + std::experimental::optional expand_lambda = + std::experimental::nullopt) { + if (!expand_lambda) { + expand_lambda = ExpandVariable::Lambda{ + symbol_table.CreateSymbol("inner_edge", true), + symbol_table.CreateSymbol("inner_node", true), nullptr}; + } + + auto edges_symbol = symbol_table.CreateSymbol("edges_symbol", true); + + auto expand_variable = std::make_shared( + sink_symbol, edges_symbol, EdgeAtom::Type::BREADTH_FIRST, dir, + std::vector{dba.EdgeType("Edge")}, false, + lower_bound, upper_bound, input_cursor, source_symbol, true, + *expand_lambda, std::experimental::nullopt, std::experimental::nullopt, + GraphView::OLD); + + auto source_output_symbol = + symbol_table.CreateSymbol("s", true, Symbol::Type::Vertex); + auto sink_output_symbol = + symbol_table.CreateSymbol("t", true, Symbol::Type::Vertex); + auto edges_output_symbol = + symbol_table.CreateSymbol("edge", true, Symbol::Type::EdgeList); + + auto source_id = IDENT("s"); + auto sink_id = IDENT("t"); + auto edges_id = IDENT("e"); + + symbol_table[*source_id] = source_symbol; + symbol_table[*sink_id] = sink_symbol; + symbol_table[*edges_id] = edges_symbol; + + auto source_ne = NEXPR("s", source_id); + auto sink_ne = NEXPR("s", sink_id); + auto edges_ne = NEXPR("e", edges_id); + + symbol_table[*source_ne] = source_output_symbol; + symbol_table[*sink_ne] = sink_output_symbol; + symbol_table[*edges_ne] = edges_output_symbol; + + auto produce = MakeProduce(expand_variable, source_ne, sink_ne, edges_ne); + return CollectProduce(produce.get(), symbol_table, dba); + } + + void CheckPath(const VertexAccessor &source, const VertexAccessor &sink, + EdgeAtom::Direction dir, const std::vector &path) { + // Check that the given path is actually a path from source to sink, that + // expansion direction is correct and that given edges actually exist in the + // test graph + VertexAccessor curr = source; + for (const auto &edge_tv : path) { + EXPECT_TRUE(edge_tv.IsEdge()); + auto edge = edge_tv.ValueEdge(); + EXPECT_TRUE(edge.from() == curr || edge.to() == curr); + EXPECT_TRUE(curr == edge.from() || dir != EdgeAtom::Direction::OUT); + EXPECT_TRUE(curr == edge.to() || dir != EdgeAtom::Direction::IN); + int from = edge.from().PropsAt(dba.Property("id")).Value(); + int to = edge.to().PropsAt(dba.Property("id")).Value(); + EXPECT_TRUE(utils::Contains(EDGES, std::make_pair(from, to))); + curr = curr == edge.from() ? edge.to() : edge.from(); + } + EXPECT_EQ(curr, sink); + } + + database::SingleNode db; + std::unique_ptr dba_ptr; + database::GraphDbAccessor &dba; + std::vector vertices; + std::vector edges; + + AstStorage storage; + SymbolTable symbol_table; + + const int NUM_VERTICES = 6; + const std::vector> EDGES = { + {0, 1}, {1, 2}, {2, 4}, {2, 5}, {4, 1}, {4, 5}, {5, 4}, {5, 5}, {5, 3}}; +}; + +TEST_F(STShortestPathTest, DirectionAndExpansionDepth) { + auto lower_bounds = iter::range(-1, NUM_VERTICES + 1); + auto upper_bounds = iter::range(-1, NUM_VERTICES + 1); + auto directions = std::vector{EdgeAtom::Direction::IN, + EdgeAtom::Direction::OUT, + EdgeAtom::Direction::BOTH}; + + for (const auto &test : + iter::product(lower_bounds, upper_bounds, directions)) { + int lower_bound; + int upper_bound; + EdgeAtom::Direction dir; + std::tie(lower_bound, upper_bound, dir) = test; + + auto dist = FloydWarshall(NUM_VERTICES, EDGES, dir); + + auto source = MakeScanAll(storage, symbol_table, "s"); + auto sink = MakeScanAll(storage, symbol_table, "t", source.op_); + + auto results = + ShortestPaths(sink.op_, source.sym_, sink.sym_, dir, + lower_bound == -1 ? nullptr : LITERAL(lower_bound), + upper_bound == -1 ? nullptr : LITERAL(upper_bound)); + + if (lower_bound == -1) lower_bound = 0; + if (upper_bound == -1) upper_bound = NUM_VERTICES; + size_t output_count = 0; + for (int i = 0; i < NUM_VERTICES; ++i) { + for (int j = 0; j < NUM_VERTICES; ++j) { + if (i != j && dist[i][j] != -1 && dist[i][j] >= lower_bound && + dist[i][j] <= upper_bound) + ++output_count; + } + } + + EXPECT_EQ(results.size(), output_count); + + for (const auto &result : results) { + int s = + result[0].ValueVertex().PropsAt(dba.Property("id")).Value(); + int t = + result[1].ValueVertex().PropsAt(dba.Property("id")).Value(); + EXPECT_EQ(dist[s][t], (int)result[2].ValueList().size()); + CheckPath(result[0].ValueVertex(), result[1].ValueVertex(), dir, + result[2].ValueList()); + } + } +} + +TEST_F(STShortestPathTest, ExpandLambda) { + Symbol inner_node_symbol = symbol_table.CreateSymbol("inner_node", true); + Symbol inner_edge_symbol = symbol_table.CreateSymbol("inner_edge", true); + auto inner_node = IDENT("inner_node"); + auto inner_edge = IDENT("inner_edge"); + + symbol_table[*inner_node] = inner_node_symbol; + symbol_table[*inner_edge] = inner_edge_symbol; + + // (filter expression, expected shortest path length) + std::vector> tests = { + // Block vertex 1 (this stops expansion from source side) + {NEQ(PROPERTY_LOOKUP(inner_node, dba.Property("id")), LITERAL(1)), -1}, + // Block vertex 5 (this stops expansion from sink side) + {NEQ(PROPERTY_LOOKUP(inner_node, dba.Property("id")), LITERAL(5)), -1}, + // Block source vertex + {NEQ(PROPERTY_LOOKUP(inner_node, dba.Property("id")), LITERAL(0)), 4}, + // Block sink vertex + {NEQ(PROPERTY_LOOKUP(inner_node, dba.Property("id")), LITERAL(3)), -1}, + // Block edge 0-1 (this stops expansion from source side) + {NEQ(PROPERTY_LOOKUP(inner_edge, dba.Property("id")), LITERAL("0-1")), + -1}, + // Block edge 5-3 (this stops expansion from sink side) + {NEQ(PROPERTY_LOOKUP(inner_edge, dba.Property("id")), LITERAL("5-3")), + -1}, + // Block edges 2-5 and 4-1 + {AND(NEQ(PROPERTY_LOOKUP(inner_edge, dba.Property("id")), LITERAL("2-5")), + NEQ(PROPERTY_LOOKUP(inner_edge, dba.Property("id")), + LITERAL("4-1"))), + 5}}; + + for (auto test : tests) { + Expression *expression; + int length; + + std::tie(expression, length) = test; + + auto source = + MakeUnwind(symbol_table, "s", nullptr, LIST(LITERAL(vertices[0]))); + auto sink = + MakeUnwind(symbol_table, "t", source.op_, LIST(LITERAL(vertices[3]))); + auto results = + ShortestPaths(sink.op_, source.sym_, sink.sym_, + EdgeAtom::Direction::BOTH, nullptr, nullptr, + ExpandVariable::Lambda{inner_edge_symbol, + inner_node_symbol, expression}); + + if (length == -1) { + EXPECT_EQ(results.size(), 0); + } else { + ASSERT_EQ(results.size(), 1); + EXPECT_EQ(results[0][2].ValueList().size(), length); + } + } +} + +TEST_F(STShortestPathTest, OptionalMatch) { + for (int i = 0; i <= 2; ++i) { + auto source = MakeUnwind( + symbol_table, "s", nullptr, + LIST(i == 0 ? LITERAL(vertices[0]) : LITERAL(TypedValue::Null))); + auto sink = MakeUnwind( + symbol_table, "t", source.op_, + LIST(i == 1 ? LITERAL(vertices[3]) : LITERAL(TypedValue::Null))); + auto results = ShortestPaths(sink.op_, source.sym_, sink.sym_, + EdgeAtom::Direction::BOTH); + EXPECT_EQ(results.size(), 0); + } +} + enum class TestType { SINGLE_NODE, DISTRIBUTED }; /** A test fixture for breadth first expansion */ @@ -906,8 +1168,7 @@ class QueryPlanExpandBfs // Defines and performs a breadth-first expansion with the given parameters. // Returns a vector of pairs. Each pair is (vector-of-edges, vertex). auto ExpandBF(EdgeAtom::Direction direction, int min_depth, int max_depth, - Expression *where, GraphView graph_view = GraphView::OLD, - const std::vector &sources = {}, + Expression *where, const std::vector &sources = {}, std::experimental::optional existing_node = std::experimental::nullopt) { auto source_sym = symbol_table.CreateSymbol("source", true); @@ -928,8 +1189,8 @@ class QueryPlanExpandBfs if (GetParam().first == TestType::DISTRIBUTED) { last_op = std::make_shared( node_sym, edge_list_sym, direction, std::vector{}, - last_op, source_sym, !!existing_node, graph_view, LITERAL(min_depth), - LITERAL(max_depth), + last_op, source_sym, !!existing_node, GraphView::OLD, + LITERAL(min_depth), LITERAL(max_depth), ExpandVariable::Lambda{inner_edge, inner_node, where}); } else { last_op = std::make_shared( @@ -938,7 +1199,8 @@ class QueryPlanExpandBfs LITERAL(max_depth), last_op, source_sym, static_cast(existing_node), ExpandVariable::Lambda{inner_edge, inner_node, where}, - std::experimental::nullopt, std::experimental::nullopt, graph_view); + std::experimental::nullopt, std::experimental::nullopt, + GraphView::OLD); } Frame frame(symbol_table.max_position()); @@ -990,7 +1252,7 @@ class QueryPlanExpandBfs TEST_P(QueryPlanExpandBfs, Basic) { auto results = ExpandBF(EdgeAtom::Direction::BOTH, 1, 1000, nullptr, - GraphView::OLD, {VertexAccessor(v[0], dba)}); + {VertexAccessor(v[0], dba)}); ASSERT_EQ(results.size(), 5); @@ -1021,7 +1283,7 @@ TEST_P(QueryPlanExpandBfs, Basic) { TEST_P(QueryPlanExpandBfs, EdgeDirection) { { auto results = ExpandBF(EdgeAtom::Direction::OUT, 1, 1000, nullptr, - GraphView::OLD, {VertexAccessor(v[4], dba)}); + {VertexAccessor(v[4], dba)}); ASSERT_EQ(results.size(), 4); if (GetProp(results[0].second) == 5) { @@ -1047,7 +1309,7 @@ TEST_P(QueryPlanExpandBfs, EdgeDirection) { { auto results = ExpandBF(EdgeAtom::Direction::IN, 1, 1000, nullptr, - GraphView::OLD, {VertexAccessor(v[4], dba)}); + {VertexAccessor(v[4], dba)}); ASSERT_EQ(results.size(), 4); if (GetProp(results[0].second) == 5) { @@ -1079,7 +1341,7 @@ TEST_P(QueryPlanExpandBfs, Where) { symbol_table[*ident] = inner_node; auto filter_expr = LESS(PROPERTY_LOOKUP(ident, prop), LITERAL(4)); auto results = ExpandBF(EdgeAtom::Direction::BOTH, 1, 1000, filter_expr, - GraphView::OLD, {VertexAccessor(v[0], dba)}); + {VertexAccessor(v[0], dba)}); ASSERT_EQ(results.size(), 2); EXPECT_EQ(GetProp(results[0].second), 1); EXPECT_EQ(GetProp(results[1].second), 2); @@ -1089,7 +1351,7 @@ TEST_P(QueryPlanExpandBfs, Where) { auto filter_expr = AND(LESS(PROPERTY_LOOKUP(ident, prop), LITERAL(50)), NEQ(PROPERTY_LOOKUP(ident, prop), LITERAL(12))); auto results = ExpandBF(EdgeAtom::Direction::BOTH, 1, 1000, filter_expr, - GraphView::OLD, {VertexAccessor(v[0], dba)}); + {VertexAccessor(v[0], dba)}); ASSERT_EQ(results.size(), 4); EXPECT_EQ(GetProp(results[0].second), 1); EXPECT_EQ(GetProp(results[1].second), 4); @@ -1103,56 +1365,9 @@ TEST_P(QueryPlanExpandBfs, Where) { } } -TEST_P(QueryPlanExpandBfs, GraphState) { - auto ExpandSize = [this](GraphView graph_view) { - return ExpandBF(EdgeAtom::Direction::BOTH, 1, 1000, nullptr, graph_view, - {VertexAccessor(v[0], dba)}) - .size(); - }; - - EXPECT_EQ(ExpandSize(GraphView::OLD), 5); - EXPECT_EQ(ExpandSize(GraphView::NEW), 5); - - { - auto from = VertexAccessor(v[0], dba); - auto to = dba.InsertVertex(); - v.push_back(to.GlobalAddress()); - - dba.InsertEdge(from, to, edge_type); - ApplyUpdates(dba.transaction_id()); - } - - EXPECT_EQ(ExpandSize(GraphView::OLD), 5); - EXPECT_EQ(ExpandSize(GraphView::NEW), 6); - - AdvanceCommand(dba.transaction_id()); - - EXPECT_EQ(ExpandSize(GraphView::OLD), 6); - EXPECT_EQ(ExpandSize(GraphView::NEW), 6); - - { - v.push_back(dba.InsertVertex().GlobalAddress()); - AdvanceCommand(dba.transaction_id()); - - auto from = VertexAccessor(v[4], dba); - auto to = VertexAccessor(v[7], dba); - - dba.InsertEdge(from, to, edge_type); - ApplyUpdates(dba.transaction_id()); - } - - EXPECT_EQ(ExpandSize(GraphView::OLD), 6); - EXPECT_EQ(ExpandSize(GraphView::NEW), 7); - - AdvanceCommand(dba.transaction_id()); - - EXPECT_EQ(ExpandSize(GraphView::OLD), 7); - EXPECT_EQ(ExpandSize(GraphView::NEW), 7); -} - TEST_P(QueryPlanExpandBfs, MultipleInputs) { auto results = - ExpandBF(EdgeAtom::Direction::BOTH, 1, 1000, nullptr, GraphView::OLD, + ExpandBF(EdgeAtom::Direction::BOTH, 1, 1000, nullptr, {VertexAccessor(v[0], dba), VertexAccessor(v[3], dba)}); // Expect that each vertex has been returned 2 times. EXPECT_EQ(results.size(), 10); @@ -1162,6 +1377,8 @@ TEST_P(QueryPlanExpandBfs, MultipleInputs) { } TEST_P(QueryPlanExpandBfs, ExistingNode) { + // In single-node, this is handled by STShortestPath cursor instead of + // SingleSourceShortestPath cursor. using testing::ElementsAre; using testing::WhenSorted; @@ -1172,14 +1389,14 @@ TEST_P(QueryPlanExpandBfs, ExistingNode) { { auto results = - ExpandBF(EdgeAtom::Direction::BOTH, 1, 1000, nullptr, GraphView::OLD, + ExpandBF(EdgeAtom::Direction::BOTH, 1, 1000, nullptr, {VertexAccessor(v[0], dba)}, VertexAccessor(v[3], dba)); EXPECT_EQ(results.size(), 1); EXPECT_EQ(GetProp(results[0].second), 3); } { - auto results = ExpandBF(EdgeAtom::Direction::IN, 1, 1000, nullptr, - GraphView::OLD, sources, VertexAccessor(v[5], dba)); + auto results = ExpandBF(EdgeAtom::Direction::IN, 1, 1000, nullptr, sources, + VertexAccessor(v[5], dba)); std::vector nodes; for (auto &row : results) { @@ -1189,8 +1406,8 @@ TEST_P(QueryPlanExpandBfs, ExistingNode) { EXPECT_THAT(nodes, WhenSorted(ElementsAre(1, 2, 3, 4))); } { - auto results = ExpandBF(EdgeAtom::Direction::OUT, 1, 1000, nullptr, - GraphView::OLD, sources, VertexAccessor(v[5], dba)); + auto results = ExpandBF(EdgeAtom::Direction::OUT, 1, 1000, nullptr, sources, + VertexAccessor(v[5], dba)); std::vector nodes; for (auto &row : results) { @@ -1204,13 +1421,12 @@ TEST_P(QueryPlanExpandBfs, ExistingNode) { TEST_P(QueryPlanExpandBfs, OptionalMatch) { { auto results = ExpandBF(EdgeAtom::Direction::BOTH, 1, 1000, nullptr, - GraphView::OLD, {TypedValue::Null}); + {TypedValue::Null}); EXPECT_EQ(results.size(), 0); } { - auto results = - ExpandBF(EdgeAtom::Direction::BOTH, 1, 1000, nullptr, GraphView::OLD, - {VertexAccessor(v[0], dba)}, TypedValue::Null); + auto results = ExpandBF(EdgeAtom::Direction::BOTH, 1, 1000, nullptr, + {VertexAccessor(v[0], dba)}, TypedValue::Null); EXPECT_EQ(results.size(), 0); } } @@ -1218,7 +1434,7 @@ TEST_P(QueryPlanExpandBfs, OptionalMatch) { TEST_P(QueryPlanExpandBfs, ExpansionDepth) { { auto results = ExpandBF(EdgeAtom::Direction::BOTH, 2, 3, nullptr, - GraphView::OLD, {VertexAccessor(v[0], dba)}); + {VertexAccessor(v[0], dba)}); EXPECT_EQ(results.size(), 3); if (GetProp(results[0].second) == 4) { std::swap(results[0], results[1]);