Implement single node two-sided BFS

Reviewers: mculinovic, teon.banek, ipaljak, buda, msantl

Reviewed By: teon.banek

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1567
This commit is contained in:
Marin Tomic 2018-08-29 10:26:27 +02:00
parent 24e2b31367
commit 28ba872668
9 changed files with 602 additions and 136 deletions

View File

@ -18,6 +18,11 @@ class Frame {
return elems_[symbol.position()];
}
TypedValue &at(const Symbol &symbol) { return elems_.at(symbol.position()); }
const TypedValue &at(const Symbol &symbol) const {
return elems_.at(symbol.position());
}
auto &elems() { return elems_; }
private:

View File

@ -841,7 +841,7 @@ class DistributedExpandBfsCursor : public query::plan::Cursor {
upper_bound_ = self_.upper_bound()
? EvaluateInt(&evaluator, self_.upper_bound(),
"Max depth in breadth-first expansion")
: std::numeric_limits<int>::max();
: std::numeric_limits<int64_t>::max();
skip_rest_ = false;
if (upper_bound_ < 1) {
@ -870,8 +870,8 @@ class DistributedExpandBfsCursor : public query::plan::Cursor {
// Depth bounds. Calculated on each pull from the input, the initial value
// is irrelevant.
int lower_bound_{-1};
int upper_bound_{-1};
int64_t lower_bound_{-1};
int64_t upper_bound_{-1};
// When set to true, expansion is restarted from a new source.
bool skip_rest_{false};

View File

@ -1019,14 +1019,247 @@ class ExpandVariableCursor : public Cursor {
}
};
class ExpandBfsCursor : public query::plan::Cursor {
class STShortestPathCursor : public query::plan::Cursor {
public:
ExpandBfsCursor(const ExpandVariable &self, database::GraphDbAccessor &db)
: self_(self), input_cursor_(self_.input_->MakeCursor(db)) {}
STShortestPathCursor(const ExpandVariable &self,
database::GraphDbAccessor &dba)
: self_(self), input_cursor_(self_.input()->MakeCursor(dba)) {
CHECK(self_.graph_view() == GraphView::OLD)
<< "ExpandVariable should only be planned with GraphView::OLD";
CHECK(self_.existing_node()) << "s-t shortest path algorithm should only "
"be used when `existing_node` flag is "
"set!";
}
bool Pull(Frame &frame, Context &context) override {
// evaluator for the filtering condition
ExpressionEvaluator evaluator(frame, &context, self_.graph_view_);
ExpressionEvaluator evaluator(frame, &context, GraphView::OLD);
while (input_cursor_->Pull(frame, context)) {
auto source_tv = frame[self_.input_symbol()];
auto sink_tv = frame[self_.node_symbol()];
// It is possible that source or sink vertex is Null due to optional
// matching.
if (source_tv.IsNull() || sink_tv.IsNull()) continue;
auto source = source_tv.ValueVertex();
auto sink = sink_tv.ValueVertex();
int64_t lower_bound =
self_.lower_bound()
? EvaluateInt(&evaluator, self_.lower_bound(),
"Min depth in breadth-first expansion")
: 1;
int64_t upper_bound =
self_.upper_bound()
? EvaluateInt(&evaluator, self_.upper_bound(),
"Max depth in breadth-first expansion")
: std::numeric_limits<int64_t>::max();
if (upper_bound < 1 || lower_bound > upper_bound) continue;
if (FindPath(source, sink, lower_bound, upper_bound, &frame,
&evaluator)) {
return true;
}
}
return false;
}
void Reset() override { input_cursor_->Reset(); }
private:
const ExpandVariable &self_;
std::unique_ptr<query::plan::Cursor> input_cursor_;
using VertexEdgeMapT =
std::unordered_map<VertexAccessor,
std::experimental::optional<EdgeAccessor>>;
void ReconstructPath(const VertexAccessor &midpoint,
const VertexEdgeMapT &in_edge,
const VertexEdgeMapT &out_edge, Frame *frame) {
std::vector<TypedValue> result;
auto last_vertex = midpoint;
while (true) {
const auto &last_edge = in_edge.at(last_vertex);
if (!last_edge) break;
last_vertex =
last_edge->from_is(last_vertex) ? last_edge->to() : last_edge->from();
result.emplace_back(*last_edge);
}
std::reverse(result.begin(), result.end());
last_vertex = midpoint;
while (true) {
const auto &last_edge = out_edge.at(last_vertex);
if (!last_edge) break;
last_vertex =
last_edge->from_is(last_vertex) ? last_edge->to() : last_edge->from();
result.emplace_back(*last_edge);
}
frame->at(self_.edge_symbol()) = std::move(result);
}
bool ShouldExpand(const VertexAccessor &vertex, const EdgeAccessor &edge,
Frame *frame, ExpressionEvaluator *evaluator) {
if (!self_.filter_lambda().expression) return true;
frame->at(self_.filter_lambda().inner_node_symbol) = vertex;
frame->at(self_.filter_lambda().inner_edge_symbol) = edge;
TypedValue result = self_.filter_lambda().expression->Accept(*evaluator);
if (result.IsNull()) return false;
if (result.IsBool()) return result.ValueBool();
throw QueryRuntimeException(
"Expansion condition must evaluate to boolean or null");
}
bool FindPath(const VertexAccessor &source, const VertexAccessor &sink,
int64_t lower_bound, int64_t upper_bound, Frame *frame,
ExpressionEvaluator *evaluator) {
using utils::Contains;
if (source == sink) return false;
// We expand from both directions, both from the source and the sink.
// Expansions meet at the middle of the path if it exists. This should
// perform better for real-world like graphs where the expansion front
// grows exponentially, effectively reducing the exponent by half.
// Holds vertices at the current level of expansion from the source
// (sink).
std::vector<VertexAccessor> source_frontier;
std::vector<VertexAccessor> sink_frontier;
// Holds vertices we can expand to from `source_frontier`
// (`sink_frontier`).
std::vector<VertexAccessor> source_next;
std::vector<VertexAccessor> sink_next;
// Maps each vertex we visited expanding from the source (sink) to the
// edge used. Necessary for path reconstruction.
VertexEdgeMapT in_edge;
VertexEdgeMapT out_edge;
size_t current_length = 0;
source_frontier.emplace_back(source);
in_edge[source] = std::experimental::nullopt;
sink_frontier.emplace_back(sink);
out_edge[sink] = std::experimental::nullopt;
while (true) {
// Top-down step (expansion from the source).
++current_length;
if (current_length > upper_bound) return false;
for (const auto &vertex : source_frontier) {
if (self_.direction() != EdgeAtom::Direction::IN) {
for (const auto &edge : vertex.out(&self_.edge_types())) {
if (ShouldExpand(edge.to(), edge, frame, evaluator) &&
!Contains(in_edge, edge.to())) {
in_edge.emplace(edge.to(), edge);
if (Contains(out_edge, edge.to())) {
if (current_length >= lower_bound) {
ReconstructPath(edge.to(), in_edge, out_edge, frame);
return true;
} else {
return false;
}
}
source_next.push_back(edge.to());
}
}
}
if (self_.direction() != EdgeAtom::Direction::OUT) {
for (const auto &edge : vertex.in(&self_.edge_types())) {
if (ShouldExpand(edge.from(), edge, frame, evaluator) &&
!Contains(in_edge, edge.from())) {
in_edge.emplace(edge.from(), edge);
if (Contains(out_edge, edge.from())) {
if (current_length >= lower_bound) {
ReconstructPath(edge.from(), in_edge, out_edge, frame);
return true;
} else {
return false;
}
}
source_next.push_back(edge.from());
}
}
}
}
if (source_next.empty()) return false;
source_frontier.clear();
std::swap(source_frontier, source_next);
// Bottom-up step (expansion from the sink).
++current_length;
if (current_length > upper_bound) return false;
// When expanding from the sink we have to be careful which edge
// endpoint we pass to `should_expand`, because everything is
// reversed.
for (const auto &vertex : sink_frontier) {
if (self_.direction() != EdgeAtom::Direction::OUT) {
for (const auto &edge : vertex.out(&self_.edge_types())) {
if (ShouldExpand(vertex, edge, frame, evaluator) &&
!Contains(out_edge, edge.to())) {
out_edge.emplace(edge.to(), edge);
if (Contains(in_edge, edge.to())) {
if (current_length >= lower_bound) {
ReconstructPath(edge.to(), in_edge, out_edge, frame);
return true;
} else {
return false;
}
}
sink_next.push_back(edge.to());
}
}
}
if (self_.direction() != EdgeAtom::Direction::IN) {
for (const auto &edge : vertex.in(&self_.edge_types())) {
if (ShouldExpand(vertex, edge, frame, evaluator) &&
!Contains(out_edge, edge.from())) {
out_edge.emplace(edge.from(), edge);
if (Contains(in_edge, edge.from())) {
if (current_length >= lower_bound) {
ReconstructPath(edge.from(), in_edge, out_edge, frame);
return true;
} else {
return false;
}
}
sink_next.push_back(edge.from());
}
}
}
}
if (sink_next.empty()) return false;
sink_frontier.clear();
std::swap(sink_frontier, sink_next);
}
}
};
class SingleSourceShortestPathCursor : public query::plan::Cursor {
public:
SingleSourceShortestPathCursor(const ExpandVariable &self,
database::GraphDbAccessor &db)
: self_(self), input_cursor_(self_.input()->MakeCursor(db)) {
CHECK(self_.graph_view() == GraphView::OLD)
<< "ExpandVariable should only be planned with GraphView::OLD";
CHECK(!self_.existing_node()) << "Single source shortest path algorithm "
"should not be used when `existing_node` "
"flag is set, s-t shortest path algorithm "
"should be used instead!";
}
bool Pull(Frame &frame, Context &context) override {
ExpressionEvaluator evaluator(frame, &context, GraphView::OLD);
// for the given (edge, vertex) pair checks if they satisfy the
// "where" condition. if so, places them in the to_visit_ structure.
@ -1035,14 +1268,11 @@ class ExpandBfsCursor : public query::plan::Cursor {
// if we already processed the given vertex it doesn't get expanded
if (processed_.find(vertex) != processed_.end()) return;
SwitchAccessor(edge, self_.graph_view_);
SwitchAccessor(vertex, self_.graph_view_);
frame[self_.filter_lambda().inner_edge_symbol] = edge;
frame[self_.filter_lambda().inner_node_symbol] = vertex;
frame[self_.filter_lambda_.inner_edge_symbol] = edge;
frame[self_.filter_lambda_.inner_node_symbol] = vertex;
if (self_.filter_lambda_.expression) {
TypedValue result = self_.filter_lambda_.expression->Accept(evaluator);
if (self_.filter_lambda().expression) {
TypedValue result = self_.filter_lambda().expression->Accept(evaluator);
switch (result.type()) {
case TypedValue::Type::Null:
return;
@ -1062,12 +1292,12 @@ class ExpandBfsCursor : public query::plan::Cursor {
// from the given vertex. skips expansions that don't satisfy
// the "where" condition.
auto expand_from_vertex = [this, &expand_pair](VertexAccessor &vertex) {
if (self_.direction_ != EdgeAtom::Direction::IN) {
for (const EdgeAccessor &edge : vertex.out(&self_.edge_types_))
if (self_.direction() != EdgeAtom::Direction::IN) {
for (const EdgeAccessor &edge : vertex.out(&self_.edge_types()))
expand_pair(edge, edge.to());
}
if (self_.direction_ != EdgeAtom::Direction::OUT) {
for (const EdgeAccessor &edge : vertex.in(&self_.edge_types_))
if (self_.direction() != EdgeAtom::Direction::OUT) {
for (const EdgeAccessor &edge : vertex.in(&self_.edge_types()))
expand_pair(edge, edge.from());
}
};
@ -1079,28 +1309,26 @@ class ExpandBfsCursor : public query::plan::Cursor {
// if current is still empty, it means both are empty, so pull from
// input
if (skip_rest_ || to_visit_current_.empty()) {
if (to_visit_current_.empty()) {
if (!input_cursor_->Pull(frame, context)) return false;
to_visit_current_.clear();
to_visit_next_.clear();
processed_.clear();
auto vertex_value = frame[self_.input_symbol_];
auto vertex_value = frame[self_.input_symbol()];
// it is possible that the vertex is Null due to optional matching
if (vertex_value.IsNull()) continue;
auto vertex = vertex_value.Value<VertexAccessor>();
SwitchAccessor(vertex, self_.graph_view_);
processed_.emplace(vertex, std::experimental::nullopt);
expand_from_vertex(vertex);
lower_bound_ = self_.lower_bound_
? EvaluateInt(&evaluator, self_.lower_bound_,
lower_bound_ = self_.lower_bound()
? EvaluateInt(&evaluator, self_.lower_bound(),
"Min depth in breadth-first expansion")
: 1;
upper_bound_ = self_.upper_bound_
? EvaluateInt(&evaluator, self_.upper_bound_,
upper_bound_ = self_.upper_bound()
? EvaluateInt(&evaluator, self_.upper_bound(),
"Max depth in breadth-first expansion")
: std::numeric_limits<int>::max();
skip_rest_ = false;
: std::numeric_limits<int64_t>::max();
if (upper_bound_ < 1)
throw QueryRuntimeException(
"Max depth in breadth-first expansion must be greater then "
@ -1112,8 +1340,8 @@ class ExpandBfsCursor : public query::plan::Cursor {
// take the next expansion from the queue
std::pair<EdgeAccessor, VertexAccessor> expansion =
to_visit_current_.front();
to_visit_current_.pop_front();
to_visit_current_.back();
to_visit_current_.pop_back();
// create the frame value for the edges
std::vector<TypedValue> edge_list{expansion.first};
@ -1130,25 +1358,16 @@ class ExpandBfsCursor : public query::plan::Cursor {
}
// expand only if what we've just expanded is less then max depth
if (static_cast<int>(edge_list.size()) < upper_bound_)
if (static_cast<int64_t>(edge_list.size()) < upper_bound_)
expand_from_vertex(expansion.second);
if (static_cast<int64_t>(edge_list.size()) < lower_bound_) continue;
// place destination node on the frame, handle existence flag
if (self_.existing_node_) {
TypedValue &node = frame[self_.node_symbol_];
// due to optional matching the existing node could be null
if (node.IsNull() || (node != expansion.second).Value<bool>()) continue;
// there is no point in traversing the rest of the graph because bfs
// can find only one path to a certain node
skip_rest_ = true;
} else
frame[self_.node_symbol_] = expansion.second;
frame[self_.node_symbol()] = expansion.second;
// place edges on the frame in the correct order
std::reverse(edge_list.begin(), edge_list.end());
frame[self_.edge_symbol_] = std::move(edge_list);
frame[self_.edge_symbol()] = std::move(edge_list);
return true;
}
@ -1164,13 +1383,10 @@ class ExpandBfsCursor : public query::plan::Cursor {
const ExpandVariable &self_;
const std::unique_ptr<query::plan::Cursor> input_cursor_;
// Depth bounds. Calculated on each pull from the input, the initial value is
// irrelevant.
int lower_bound_{-1};
int upper_bound_{-1};
// when set to true, expansion is restarted from a new source
bool skip_rest_{false};
// Depth bounds. Calculated on each pull from the input, the initial value
// is irrelevant.
int64_t lower_bound_{-1};
int64_t upper_bound_{-1};
// maps vertices to the edge they got expanded from. it is an optional
// edge because the root does not get expanded from anything.
@ -1178,8 +1394,8 @@ class ExpandBfsCursor : public query::plan::Cursor {
std::unordered_map<VertexAccessor, std::experimental::optional<EdgeAccessor>>
processed_;
// edge/vertex pairs we have yet to visit, for current and next depth
std::deque<std::pair<EdgeAccessor, VertexAccessor>> to_visit_current_;
std::deque<std::pair<EdgeAccessor, VertexAccessor>> to_visit_next_;
std::vector<std::pair<EdgeAccessor, VertexAccessor>> to_visit_current_;
std::vector<std::pair<EdgeAccessor, VertexAccessor>> to_visit_next_;
};
class ExpandWeightedShortestPathCursor : public query::plan::Cursor {
@ -1195,7 +1411,8 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor {
};
// For the given (edge, vertex, weight, depth) tuple checks if they
// satisfy the "where" condition. if so, places them in the priority queue.
// satisfy the "where" condition. if so, places them in the priority
// queue.
auto expand_pair = [this, &evaluator, &frame, &create_state](
EdgeAccessor edge, VertexAccessor vertex,
double weight, int depth) {
@ -1269,7 +1486,7 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor {
"Max depth in weighted shortest path expansion");
upper_bound_set_ = true;
} else {
upper_bound_ = std::numeric_limits<int>::max();
upper_bound_ = std::numeric_limits<int64_t>::max();
upper_bound_set_ = false;
}
if (upper_bound_ < 1)
@ -1370,7 +1587,7 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor {
const std::unique_ptr<query::plan::Cursor> input_cursor_;
// Upper bound on the path length.
int upper_bound_{-1};
int64_t upper_bound_{-1};
bool upper_bound_set_{false};
struct WspStateHash {
@ -1418,12 +1635,20 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor {
std::unique_ptr<Cursor> ExpandVariable::MakeCursor(
database::GraphDbAccessor &db) const {
if (type_ == EdgeAtom::Type::BREADTH_FIRST) {
return std::make_unique<ExpandBfsCursor>(*this, db);
} else if (type_ == EdgeAtom::Type::WEIGHTED_SHORTEST_PATH) {
return std::make_unique<ExpandWeightedShortestPathCursor>(*this, db);
} else {
return std::make_unique<ExpandVariableCursor>(*this, db);
switch (type_) {
case EdgeAtom::Type::BREADTH_FIRST:
if (existing_node_) {
return std::make_unique<STShortestPathCursor>(*this, db);
} else {
return std::make_unique<SingleSourceShortestPathCursor>(*this, db);
}
case EdgeAtom::Type::DEPTH_FIRST:
return std::make_unique<ExpandVariableCursor>(*this, db);
case EdgeAtom::Type::WEIGHTED_SHORTEST_PATH:
return std::make_unique<ExpandWeightedShortestPathCursor>(*this, db);
case EdgeAtom::Type::SINGLE:
LOG(FATAL)
<< "ExpandVariable should not be planned for a single expansion!";
}
}
@ -2400,9 +2625,9 @@ Skip::SkipCursor::SkipCursor(const Skip &self, database::GraphDbAccessor &db)
bool Skip::SkipCursor::Pull(Frame &frame, Context &context) {
while (input_cursor_->Pull(frame, context)) {
if (to_skip_ == -1) {
// First successful pull from the input, evaluate the skip expression. The
// skip expression doesn't contain identifiers so graph view parameter is
// not important.
// First successful pull from the input, evaluate the skip expression.
// The skip expression doesn't contain identifiers so graph view
// parameter is not important.
ExpressionEvaluator evaluator(frame, &context, GraphView::OLD);
TypedValue to_skip = self_.expression_->Accept(evaluator);
if (to_skip.type() != TypedValue::Type::Int)

View File

@ -937,7 +937,6 @@ pulled.")
// it's edges_ and edges_it_ are decltyped using a helper function
// that should be inaccessible (private class function won't compile)
friend class ExpandVariableCursor;
friend class ExpandBfsCursor;
friend class ExpandWeightedShortestPathCursor;
ExpandVariable() {}

View File

@ -3,6 +3,7 @@
#include <algorithm>
#include <chrono>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
@ -98,6 +99,12 @@ inline bool Contains(const std::unordered_set<TElement> &iterable,
return iterable.find(element) != iterable.end();
}
template <typename TKey, typename TValue>
inline bool Contains(const std::unordered_map<TKey, TValue> &iterable,
const TKey &key) {
return iterable.find(key) != iterable.end();
}
/**
* Returns `true` if the given iterable contains the given element.
*

View File

@ -51,8 +51,9 @@ class BfsPokecClient : public TestClient {
}
if (FLAGS_db == "memgraph") {
auto result = Execute(
"MATCH p = (n:User {id: $start})-[*bfs..15]->(m:User {id: $end}) "
"RETURN nodes(p) AS path LIMIT 1",
"MATCH (n:User {id: $start}), (m:User {id: $end}), "
"p = (n)-[*bfs..15]->(m) "
"RETURN extract(n in nodes(p) | n.id) AS path",
{{"start", start}, {"end", end}}, "Bfs");
CHECK(result) << "Read-only query should not fail!";
} else if (FLAGS_db == "neo4j") {
@ -70,7 +71,7 @@ class BfsPokecClient : public TestClient {
if (FLAGS_db == "memgraph") {
auto result = Execute(
"MATCH p = (n:User {id: $start})-[*bfs..15]->(m:User) WHERE m != n "
"RETURN nodes(p) AS path",
"RETURN extract(n in nodes(p) | n.id) AS path",
{{"start", start}}, "Bfs");
CHECK(result) << "Read-only query should not fail!";
} else {

View File

@ -20,4 +20,3 @@ mv .harness_summary ${script_dir}/.results/bfs_pokec/memgraph_bfs_2.summary
./harness LongRunningSuite NeoRunner --groups bfs_pokec --workload without_destination_node
mv .harness_summary ${script_dir}/.results/bfs_pokec/neo4j_bfs_2.summary

View File

@ -175,6 +175,20 @@ ExpandTuple MakeExpand(AstStorage &storage, SymbolTable &symbol_table,
return ExpandTuple{edge, edge_sym, node, node_sym, op};
}
struct UnwindTuple {
Symbol sym_;
std::shared_ptr<LogicalOperator> op_;
};
UnwindTuple MakeUnwind(SymbolTable &symbol_table,
const std::string &symbol_name,
std::shared_ptr<LogicalOperator> input,
Expression *input_expression) {
auto sym = symbol_table.CreateSymbol(symbol_name, true);
auto op = std::make_shared<query::plan::Unwind>(input, input_expression, sym);
return UnwindTuple{sym, op};
}
template <typename TIterable>
auto CountIterable(TIterable iterable) {
return std::distance(iterable.begin(), iterable.end());

View File

@ -7,6 +7,8 @@
#include <fmt/format.h>
#include "cppitertools/enumerate.hpp"
#include "cppitertools/product.hpp"
#include "cppitertools/range.hpp"
#include "cppitertools/repeat.hpp"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
@ -829,6 +831,266 @@ struct hash<std::pair<int, int>> {
};
} // namespace std
std::vector<std::vector<int>> FloydWarshall(
int num_vertices, const std::vector<std::pair<int, int>> &edges,
EdgeAtom::Direction dir) {
auto has_edge = [&](int u, int v) -> bool {
bool res = false;
if (dir != EdgeAtom::Direction::IN)
res |= utils::Contains(edges, std::make_pair(u, v));
if (dir != EdgeAtom::Direction::OUT)
res |= utils::Contains(edges, std::make_pair(v, u));
return res;
};
int inf = std::numeric_limits<int>::max();
std::vector<std::vector<int>> dist(num_vertices,
std::vector<int>(num_vertices, inf));
for (int i = 0; i < num_vertices; ++i)
for (int j = 0; j < num_vertices; ++j)
if (has_edge(i, j)) dist[i][j] = 1;
for (int i = 0; i < num_vertices; ++i) dist[i][i] = 0;
for (int k = 0; k < num_vertices; ++k) {
for (int i = 0; i < num_vertices; ++i) {
for (int j = 0; j < num_vertices; ++j) {
if (dist[i][k] == inf || dist[k][j] == inf) continue;
dist[i][j] = std::min(dist[i][j], dist[i][k] + dist[k][j]);
}
}
}
for (int i = 0; i < num_vertices; ++i)
for (int j = 0; j < num_vertices; ++j)
if (dist[i][j] == inf) dist[i][j] = -1;
return dist;
}
class STShortestPathTest : public ::testing::Test {
protected:
STShortestPathTest() : db(), dba_ptr(db.Access()), dba(*dba_ptr) {}
void SetUp() {
for (int i = 0; i < NUM_VERTICES; ++i) {
vertices.emplace_back(dba.InsertVertex());
vertices[i].PropsSet(dba.Property("id"), i);
}
for (auto edge : EDGES) {
edges.emplace_back(dba.InsertEdge(
vertices[edge.first], vertices[edge.second], dba.EdgeType("Edge")));
edges.back().PropsSet(dba.Property("id"),
fmt::format("{}-{}", edge.first, edge.second));
}
dba.AdvanceCommand();
ASSERT_EQ(dba.VerticesCount(), NUM_VERTICES);
ASSERT_EQ(dba.EdgesCount(), EDGES.size());
}
std::vector<std::vector<TypedValue>> ShortestPaths(
std::shared_ptr<query::plan::LogicalOperator> input_cursor,
Symbol source_symbol, Symbol sink_symbol, EdgeAtom::Direction dir,
Expression *lower_bound = nullptr, Expression *upper_bound = nullptr,
std::experimental::optional<ExpandVariable::Lambda> expand_lambda =
std::experimental::nullopt) {
if (!expand_lambda) {
expand_lambda = ExpandVariable::Lambda{
symbol_table.CreateSymbol("inner_edge", true),
symbol_table.CreateSymbol("inner_node", true), nullptr};
}
auto edges_symbol = symbol_table.CreateSymbol("edges_symbol", true);
auto expand_variable = std::make_shared<ExpandVariable>(
sink_symbol, edges_symbol, EdgeAtom::Type::BREADTH_FIRST, dir,
std::vector<storage::EdgeType>{dba.EdgeType("Edge")}, false,
lower_bound, upper_bound, input_cursor, source_symbol, true,
*expand_lambda, std::experimental::nullopt, std::experimental::nullopt,
GraphView::OLD);
auto source_output_symbol =
symbol_table.CreateSymbol("s", true, Symbol::Type::Vertex);
auto sink_output_symbol =
symbol_table.CreateSymbol("t", true, Symbol::Type::Vertex);
auto edges_output_symbol =
symbol_table.CreateSymbol("edge", true, Symbol::Type::EdgeList);
auto source_id = IDENT("s");
auto sink_id = IDENT("t");
auto edges_id = IDENT("e");
symbol_table[*source_id] = source_symbol;
symbol_table[*sink_id] = sink_symbol;
symbol_table[*edges_id] = edges_symbol;
auto source_ne = NEXPR("s", source_id);
auto sink_ne = NEXPR("s", sink_id);
auto edges_ne = NEXPR("e", edges_id);
symbol_table[*source_ne] = source_output_symbol;
symbol_table[*sink_ne] = sink_output_symbol;
symbol_table[*edges_ne] = edges_output_symbol;
auto produce = MakeProduce(expand_variable, source_ne, sink_ne, edges_ne);
return CollectProduce(produce.get(), symbol_table, dba);
}
void CheckPath(const VertexAccessor &source, const VertexAccessor &sink,
EdgeAtom::Direction dir, const std::vector<TypedValue> &path) {
// Check that the given path is actually a path from source to sink, that
// expansion direction is correct and that given edges actually exist in the
// test graph
VertexAccessor curr = source;
for (const auto &edge_tv : path) {
EXPECT_TRUE(edge_tv.IsEdge());
auto edge = edge_tv.ValueEdge();
EXPECT_TRUE(edge.from() == curr || edge.to() == curr);
EXPECT_TRUE(curr == edge.from() || dir != EdgeAtom::Direction::OUT);
EXPECT_TRUE(curr == edge.to() || dir != EdgeAtom::Direction::IN);
int from = edge.from().PropsAt(dba.Property("id")).Value<int64_t>();
int to = edge.to().PropsAt(dba.Property("id")).Value<int64_t>();
EXPECT_TRUE(utils::Contains(EDGES, std::make_pair(from, to)));
curr = curr == edge.from() ? edge.to() : edge.from();
}
EXPECT_EQ(curr, sink);
}
database::SingleNode db;
std::unique_ptr<database::GraphDbAccessor> dba_ptr;
database::GraphDbAccessor &dba;
std::vector<VertexAccessor> vertices;
std::vector<EdgeAccessor> edges;
AstStorage storage;
SymbolTable symbol_table;
const int NUM_VERTICES = 6;
const std::vector<std::pair<int, int>> EDGES = {
{0, 1}, {1, 2}, {2, 4}, {2, 5}, {4, 1}, {4, 5}, {5, 4}, {5, 5}, {5, 3}};
};
TEST_F(STShortestPathTest, DirectionAndExpansionDepth) {
auto lower_bounds = iter::range(-1, NUM_VERTICES + 1);
auto upper_bounds = iter::range(-1, NUM_VERTICES + 1);
auto directions = std::vector<EdgeAtom::Direction>{EdgeAtom::Direction::IN,
EdgeAtom::Direction::OUT,
EdgeAtom::Direction::BOTH};
for (const auto &test :
iter::product(lower_bounds, upper_bounds, directions)) {
int lower_bound;
int upper_bound;
EdgeAtom::Direction dir;
std::tie(lower_bound, upper_bound, dir) = test;
auto dist = FloydWarshall(NUM_VERTICES, EDGES, dir);
auto source = MakeScanAll(storage, symbol_table, "s");
auto sink = MakeScanAll(storage, symbol_table, "t", source.op_);
auto results =
ShortestPaths(sink.op_, source.sym_, sink.sym_, dir,
lower_bound == -1 ? nullptr : LITERAL(lower_bound),
upper_bound == -1 ? nullptr : LITERAL(upper_bound));
if (lower_bound == -1) lower_bound = 0;
if (upper_bound == -1) upper_bound = NUM_VERTICES;
size_t output_count = 0;
for (int i = 0; i < NUM_VERTICES; ++i) {
for (int j = 0; j < NUM_VERTICES; ++j) {
if (i != j && dist[i][j] != -1 && dist[i][j] >= lower_bound &&
dist[i][j] <= upper_bound)
++output_count;
}
}
EXPECT_EQ(results.size(), output_count);
for (const auto &result : results) {
int s =
result[0].ValueVertex().PropsAt(dba.Property("id")).Value<int64_t>();
int t =
result[1].ValueVertex().PropsAt(dba.Property("id")).Value<int64_t>();
EXPECT_EQ(dist[s][t], (int)result[2].ValueList().size());
CheckPath(result[0].ValueVertex(), result[1].ValueVertex(), dir,
result[2].ValueList());
}
}
}
TEST_F(STShortestPathTest, ExpandLambda) {
Symbol inner_node_symbol = symbol_table.CreateSymbol("inner_node", true);
Symbol inner_edge_symbol = symbol_table.CreateSymbol("inner_edge", true);
auto inner_node = IDENT("inner_node");
auto inner_edge = IDENT("inner_edge");
symbol_table[*inner_node] = inner_node_symbol;
symbol_table[*inner_edge] = inner_edge_symbol;
// (filter expression, expected shortest path length)
std::vector<std::pair<Expression *, int>> tests = {
// Block vertex 1 (this stops expansion from source side)
{NEQ(PROPERTY_LOOKUP(inner_node, dba.Property("id")), LITERAL(1)), -1},
// Block vertex 5 (this stops expansion from sink side)
{NEQ(PROPERTY_LOOKUP(inner_node, dba.Property("id")), LITERAL(5)), -1},
// Block source vertex
{NEQ(PROPERTY_LOOKUP(inner_node, dba.Property("id")), LITERAL(0)), 4},
// Block sink vertex
{NEQ(PROPERTY_LOOKUP(inner_node, dba.Property("id")), LITERAL(3)), -1},
// Block edge 0-1 (this stops expansion from source side)
{NEQ(PROPERTY_LOOKUP(inner_edge, dba.Property("id")), LITERAL("0-1")),
-1},
// Block edge 5-3 (this stops expansion from sink side)
{NEQ(PROPERTY_LOOKUP(inner_edge, dba.Property("id")), LITERAL("5-3")),
-1},
// Block edges 2-5 and 4-1
{AND(NEQ(PROPERTY_LOOKUP(inner_edge, dba.Property("id")), LITERAL("2-5")),
NEQ(PROPERTY_LOOKUP(inner_edge, dba.Property("id")),
LITERAL("4-1"))),
5}};
for (auto test : tests) {
Expression *expression;
int length;
std::tie(expression, length) = test;
auto source =
MakeUnwind(symbol_table, "s", nullptr, LIST(LITERAL(vertices[0])));
auto sink =
MakeUnwind(symbol_table, "t", source.op_, LIST(LITERAL(vertices[3])));
auto results =
ShortestPaths(sink.op_, source.sym_, sink.sym_,
EdgeAtom::Direction::BOTH, nullptr, nullptr,
ExpandVariable::Lambda{inner_edge_symbol,
inner_node_symbol, expression});
if (length == -1) {
EXPECT_EQ(results.size(), 0);
} else {
ASSERT_EQ(results.size(), 1);
EXPECT_EQ(results[0][2].ValueList().size(), length);
}
}
}
TEST_F(STShortestPathTest, OptionalMatch) {
for (int i = 0; i <= 2; ++i) {
auto source = MakeUnwind(
symbol_table, "s", nullptr,
LIST(i == 0 ? LITERAL(vertices[0]) : LITERAL(TypedValue::Null)));
auto sink = MakeUnwind(
symbol_table, "t", source.op_,
LIST(i == 1 ? LITERAL(vertices[3]) : LITERAL(TypedValue::Null)));
auto results = ShortestPaths(sink.op_, source.sym_, sink.sym_,
EdgeAtom::Direction::BOTH);
EXPECT_EQ(results.size(), 0);
}
}
enum class TestType { SINGLE_NODE, DISTRIBUTED };
/** A test fixture for breadth first expansion */
@ -906,8 +1168,7 @@ class QueryPlanExpandBfs
// Defines and performs a breadth-first expansion with the given parameters.
// Returns a vector of pairs. Each pair is (vector-of-edges, vertex).
auto ExpandBF(EdgeAtom::Direction direction, int min_depth, int max_depth,
Expression *where, GraphView graph_view = GraphView::OLD,
const std::vector<TypedValue> &sources = {},
Expression *where, const std::vector<TypedValue> &sources = {},
std::experimental::optional<TypedValue> existing_node =
std::experimental::nullopt) {
auto source_sym = symbol_table.CreateSymbol("source", true);
@ -928,8 +1189,8 @@ class QueryPlanExpandBfs
if (GetParam().first == TestType::DISTRIBUTED) {
last_op = std::make_shared<DistributedExpandBfs>(
node_sym, edge_list_sym, direction, std::vector<storage::EdgeType>{},
last_op, source_sym, !!existing_node, graph_view, LITERAL(min_depth),
LITERAL(max_depth),
last_op, source_sym, !!existing_node, GraphView::OLD,
LITERAL(min_depth), LITERAL(max_depth),
ExpandVariable::Lambda{inner_edge, inner_node, where});
} else {
last_op = std::make_shared<ExpandVariable>(
@ -938,7 +1199,8 @@ class QueryPlanExpandBfs
LITERAL(max_depth), last_op, source_sym,
static_cast<bool>(existing_node),
ExpandVariable::Lambda{inner_edge, inner_node, where},
std::experimental::nullopt, std::experimental::nullopt, graph_view);
std::experimental::nullopt, std::experimental::nullopt,
GraphView::OLD);
}
Frame frame(symbol_table.max_position());
@ -990,7 +1252,7 @@ class QueryPlanExpandBfs
TEST_P(QueryPlanExpandBfs, Basic) {
auto results = ExpandBF(EdgeAtom::Direction::BOTH, 1, 1000, nullptr,
GraphView::OLD, {VertexAccessor(v[0], dba)});
{VertexAccessor(v[0], dba)});
ASSERT_EQ(results.size(), 5);
@ -1021,7 +1283,7 @@ TEST_P(QueryPlanExpandBfs, Basic) {
TEST_P(QueryPlanExpandBfs, EdgeDirection) {
{
auto results = ExpandBF(EdgeAtom::Direction::OUT, 1, 1000, nullptr,
GraphView::OLD, {VertexAccessor(v[4], dba)});
{VertexAccessor(v[4], dba)});
ASSERT_EQ(results.size(), 4);
if (GetProp(results[0].second) == 5) {
@ -1047,7 +1309,7 @@ TEST_P(QueryPlanExpandBfs, EdgeDirection) {
{
auto results = ExpandBF(EdgeAtom::Direction::IN, 1, 1000, nullptr,
GraphView::OLD, {VertexAccessor(v[4], dba)});
{VertexAccessor(v[4], dba)});
ASSERT_EQ(results.size(), 4);
if (GetProp(results[0].second) == 5) {
@ -1079,7 +1341,7 @@ TEST_P(QueryPlanExpandBfs, Where) {
symbol_table[*ident] = inner_node;
auto filter_expr = LESS(PROPERTY_LOOKUP(ident, prop), LITERAL(4));
auto results = ExpandBF(EdgeAtom::Direction::BOTH, 1, 1000, filter_expr,
GraphView::OLD, {VertexAccessor(v[0], dba)});
{VertexAccessor(v[0], dba)});
ASSERT_EQ(results.size(), 2);
EXPECT_EQ(GetProp(results[0].second), 1);
EXPECT_EQ(GetProp(results[1].second), 2);
@ -1089,7 +1351,7 @@ TEST_P(QueryPlanExpandBfs, Where) {
auto filter_expr = AND(LESS(PROPERTY_LOOKUP(ident, prop), LITERAL(50)),
NEQ(PROPERTY_LOOKUP(ident, prop), LITERAL(12)));
auto results = ExpandBF(EdgeAtom::Direction::BOTH, 1, 1000, filter_expr,
GraphView::OLD, {VertexAccessor(v[0], dba)});
{VertexAccessor(v[0], dba)});
ASSERT_EQ(results.size(), 4);
EXPECT_EQ(GetProp(results[0].second), 1);
EXPECT_EQ(GetProp(results[1].second), 4);
@ -1103,56 +1365,9 @@ TEST_P(QueryPlanExpandBfs, Where) {
}
}
TEST_P(QueryPlanExpandBfs, GraphState) {
auto ExpandSize = [this](GraphView graph_view) {
return ExpandBF(EdgeAtom::Direction::BOTH, 1, 1000, nullptr, graph_view,
{VertexAccessor(v[0], dba)})
.size();
};
EXPECT_EQ(ExpandSize(GraphView::OLD), 5);
EXPECT_EQ(ExpandSize(GraphView::NEW), 5);
{
auto from = VertexAccessor(v[0], dba);
auto to = dba.InsertVertex();
v.push_back(to.GlobalAddress());
dba.InsertEdge(from, to, edge_type);
ApplyUpdates(dba.transaction_id());
}
EXPECT_EQ(ExpandSize(GraphView::OLD), 5);
EXPECT_EQ(ExpandSize(GraphView::NEW), 6);
AdvanceCommand(dba.transaction_id());
EXPECT_EQ(ExpandSize(GraphView::OLD), 6);
EXPECT_EQ(ExpandSize(GraphView::NEW), 6);
{
v.push_back(dba.InsertVertex().GlobalAddress());
AdvanceCommand(dba.transaction_id());
auto from = VertexAccessor(v[4], dba);
auto to = VertexAccessor(v[7], dba);
dba.InsertEdge(from, to, edge_type);
ApplyUpdates(dba.transaction_id());
}
EXPECT_EQ(ExpandSize(GraphView::OLD), 6);
EXPECT_EQ(ExpandSize(GraphView::NEW), 7);
AdvanceCommand(dba.transaction_id());
EXPECT_EQ(ExpandSize(GraphView::OLD), 7);
EXPECT_EQ(ExpandSize(GraphView::NEW), 7);
}
TEST_P(QueryPlanExpandBfs, MultipleInputs) {
auto results =
ExpandBF(EdgeAtom::Direction::BOTH, 1, 1000, nullptr, GraphView::OLD,
ExpandBF(EdgeAtom::Direction::BOTH, 1, 1000, nullptr,
{VertexAccessor(v[0], dba), VertexAccessor(v[3], dba)});
// Expect that each vertex has been returned 2 times.
EXPECT_EQ(results.size(), 10);
@ -1162,6 +1377,8 @@ TEST_P(QueryPlanExpandBfs, MultipleInputs) {
}
TEST_P(QueryPlanExpandBfs, ExistingNode) {
// In single-node, this is handled by STShortestPath cursor instead of
// SingleSourceShortestPath cursor.
using testing::ElementsAre;
using testing::WhenSorted;
@ -1172,14 +1389,14 @@ TEST_P(QueryPlanExpandBfs, ExistingNode) {
{
auto results =
ExpandBF(EdgeAtom::Direction::BOTH, 1, 1000, nullptr, GraphView::OLD,
ExpandBF(EdgeAtom::Direction::BOTH, 1, 1000, nullptr,
{VertexAccessor(v[0], dba)}, VertexAccessor(v[3], dba));
EXPECT_EQ(results.size(), 1);
EXPECT_EQ(GetProp(results[0].second), 3);
}
{
auto results = ExpandBF(EdgeAtom::Direction::IN, 1, 1000, nullptr,
GraphView::OLD, sources, VertexAccessor(v[5], dba));
auto results = ExpandBF(EdgeAtom::Direction::IN, 1, 1000, nullptr, sources,
VertexAccessor(v[5], dba));
std::vector<int> nodes;
for (auto &row : results) {
@ -1189,8 +1406,8 @@ TEST_P(QueryPlanExpandBfs, ExistingNode) {
EXPECT_THAT(nodes, WhenSorted(ElementsAre(1, 2, 3, 4)));
}
{
auto results = ExpandBF(EdgeAtom::Direction::OUT, 1, 1000, nullptr,
GraphView::OLD, sources, VertexAccessor(v[5], dba));
auto results = ExpandBF(EdgeAtom::Direction::OUT, 1, 1000, nullptr, sources,
VertexAccessor(v[5], dba));
std::vector<int> nodes;
for (auto &row : results) {
@ -1204,13 +1421,12 @@ TEST_P(QueryPlanExpandBfs, ExistingNode) {
TEST_P(QueryPlanExpandBfs, OptionalMatch) {
{
auto results = ExpandBF(EdgeAtom::Direction::BOTH, 1, 1000, nullptr,
GraphView::OLD, {TypedValue::Null});
{TypedValue::Null});
EXPECT_EQ(results.size(), 0);
}
{
auto results =
ExpandBF(EdgeAtom::Direction::BOTH, 1, 1000, nullptr, GraphView::OLD,
{VertexAccessor(v[0], dba)}, TypedValue::Null);
auto results = ExpandBF(EdgeAtom::Direction::BOTH, 1, 1000, nullptr,
{VertexAccessor(v[0], dba)}, TypedValue::Null);
EXPECT_EQ(results.size(), 0);
}
}
@ -1218,7 +1434,7 @@ TEST_P(QueryPlanExpandBfs, OptionalMatch) {
TEST_P(QueryPlanExpandBfs, ExpansionDepth) {
{
auto results = ExpandBF(EdgeAtom::Direction::BOTH, 2, 3, nullptr,
GraphView::OLD, {VertexAccessor(v[0], dba)});
{VertexAccessor(v[0], dba)});
EXPECT_EQ(results.size(), 3);
if (GetProp(results[0].second) == 4) {
std::swap(results[0], results[1]);