Add two state dijkstra for wsp implementation

Summary:
Added a new markdown file for concepts and a new test to test the edge
case with upper bound.

Reviewers: teon.banek, mtomic, dgleich, buda, ipaljak

Reviewed By: teon.banek, dgleich

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1366
This commit is contained in:
Matija Santl 2018-04-19 11:25:02 +02:00
parent 7af80ebb8d
commit 5c7d3a908f
5 changed files with 204 additions and 57 deletions

View File

@ -12,6 +12,7 @@ data structures, multi-version concurrency control and asynchronous IO.
* [About Memgraph](#about-memgraph)
* [Quick Start](quick-start.md)
* [Concepts](concepts.md)
* [Examples](examples.md)
* [Drivers](drivers.md)
* [Data Storage](storage.md)

View File

@ -0,0 +1,68 @@
## Concepts
### Weighted Shortest Path
Weighted shortest path problem is the problem of finding a path between two
nodes in a graph such that the sum of the weights of edges connecting nodes on
the path is minimized.
More about the *weighted shortest path* problem can be found
[here](https://en.wikipedia.org/wiki/Shortest_path_problem).
## Implementation
Our implementation of the *weighted shortest path* algorithm uses a modified
version of Dijkstra's algorithm that can handle length restriction. The length
restriction parameter is optional, and when it's not set it could increase the
complexity of the algorithm.
A sample query that finds a shortest path between two nodes can look like this:
MATCH (a {id: 723})-[le *wShortest 10 (e, n | e.weight) total_weight]-(b {id: 882}) RETURN *
This query has a upper bound length restriction set to `10`. This means that no
path that traverses more than `10` edges will be considered as a valid result.
#### Upper Bound Implications
Since the upper bound parameter is optional, we can have different results based
on this parameter.
Lets take a look at the following graph and queries.
```
5 5
/-----[1]-----\
/ \
/ \ 2
[0] [4]---------[5]
\ /
\ /
\--[2]---[3]--/
3 3 3
```
MATCH (a {id: 0})-[le *wShortest 3 (e, n | e.weight) total_weight]-(b {id: 5}) RETURN *
MATCH (a {id: 0})-[le *wShortest (e, n | e.weight) total_weight]-(b {id: 5}) RETURN *
The first query will try to find the weighted shortest path between nodes `0`
and `5` with the restriction on the path length set to `3`, and the second query
will try to find the weighted shortest path with no restriction on the path
length.
The expected result for the first query is `0 -> 1 -> 4 -> 5` with total cost of
`12`, while the expected result for the second query is `0 -> 2 -> 3 -> 4 -> 5`
with total cost of `11`. Obviously, the second query can find the true shortest
path because it has no restrictions on the length.
To handle cases when the length restriction is set, *weighted shortest path*
algorithm uses both vertex and distance as the state. This causes the search
space to increase by the factor of the given upper bound. On the other hand, not
setting the upper bound parameter, the search space might contain the whole
graph.
Because of this, one should always try to narrow down the upper bound limit to
be as precise as possible in order to have a more performant query.

View File

@ -502,7 +502,7 @@ shortest path expansion:
MATCH (a {id: 723})-[le *wShortest 10 (e, n | e.weight) total_weight]-(b {id: 882}) RETURN *
The above query will find the shortest path of length up to 10 nodes between
nodes `a` and `b`.
nodes `a` and `b`. The length restriction parameter is optional.
Weighted Shortest Path expansion allows an arbitrary expression that determines
the weight for the current expansion. Total weight of a path is calculated as

View File

@ -5,7 +5,10 @@
#include <queue>
#include <random>
#include <string>
#include <tuple>
#include <type_traits>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include "boost/archive/binary_iarchive.hpp"
@ -25,6 +28,7 @@
#include "query/path.hpp"
#include "utils/algorithm.hpp"
#include "utils/exceptions.hpp"
#include "utils/hashing/fnv.hpp"
DEFINE_HIDDEN_int32(remote_pull_sleep_micros, 10,
"Sleep between remote result pulling in microseconds");
@ -1212,10 +1216,15 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor {
ExpressionEvaluator evaluator(frame, context.parameters_,
context.symbol_table_, db_,
self_.graph_view_);
// For the given (vertex, edge, vertex) tuple checks if they satisfy the
// "where" condition. if so, places them in the priority queue.
auto expand_pair = [this, &evaluator, &frame](
VertexAccessor from, EdgeAccessor edge, VertexAccessor vertex) {
auto create_state = [this](VertexAccessor vertex, int depth) {
return std::make_pair(vertex, upper_bound_set_ ? depth : 0);
};
// For the given (edge, vertex, weight, depth) tuple checks if they
// satisfy the "where" condition. if so, places them in the priority queue.
auto expand_pair = [this, &evaluator, &frame, &create_state](
EdgeAccessor edge, VertexAccessor vertex, double weight, int depth) {
SwitchAccessor(edge, self_.graph_view_);
SwitchAccessor(vertex, self_.graph_view_);
@ -1240,28 +1249,29 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor {
throw QueryRuntimeException("Calculated weight can't be negative!");
}
auto total_weight = weights_[from] + typed_weight;
auto found_it = weights_.find(vertex);
if (found_it != weights_.end() &&
found_it->second.Value<double>() <= total_weight.Value<double>())
auto next_state = create_state(vertex, depth);
auto next_weight = weight + typed_weight;
auto found_it = total_cost_.find(next_state);
if (found_it != total_cost_.end() &&
found_it->second.Value<double>() <= next_weight.Value<double>())
return;
pq_.push(std::make_pair(std::make_pair(vertex, edge),
total_weight.Value<double>()));
pq_.push({next_weight.Value<double>(), depth + 1, vertex, edge});
};
// Populates the priority queue structure with expansions
// from the given vertex. skips expansions that don't satisfy
// the "where" condition.
auto expand_from_vertex = [this, &expand_pair](VertexAccessor &vertex) {
auto expand_from_vertex = [this, &expand_pair](VertexAccessor &vertex,
double weight, int depth) {
if (self_.direction_ != EdgeAtom::Direction::IN) {
for (const EdgeAccessor &edge : vertex.out(&self_.edge_types_)) {
expand_pair(vertex, edge, edge.to());
expand_pair(edge, edge.to(), weight, depth);
}
}
if (self_.direction_ != EdgeAtom::Direction::OUT) {
for (const EdgeAccessor &edge : vertex.in(&self_.edge_types_)) {
expand_pair(vertex, edge, edge.from());
expand_pair(edge, edge.from(), weight, depth);
}
}
};
@ -1279,11 +1289,15 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor {
if (node.IsNull()) continue;
}
SwitchAccessor(vertex, self_.graph_view_);
if (self_.upper_bound_) {
upper_bound_ =
self_.upper_bound_
? EvaluateInt(evaluator, self_.upper_bound_,
"Max depth in weighted shortest path expansion")
: std::numeric_limits<int>::max();
EvaluateInt(evaluator, self_.upper_bound_,
"Max depth in weighted shortest path expansion");
upper_bound_set_ = true;
} else {
upper_bound_ = std::numeric_limits<int>::max();
upper_bound_set_ = false;
}
if (upper_bound_ < 1)
throw QueryRuntimeException(
"Max depth in weighted shortest path expansion must be greater "
@ -1291,53 +1305,70 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor {
// Clear existing data structures.
previous_.clear();
weights_.clear();
total_cost_.clear();
yielded_vertices_.clear();
pq_.push(std::make_pair(
std::make_pair(vertex, std::experimental::nullopt), 0.0));
pq_.push({0.0, 0, vertex, std::experimental::nullopt});
// We are adding the starting vertex to the set of yielded vertices
// because we don't want to yield paths that end with the starting
// vertex.
yielded_vertices_.insert(vertex);
}
while (!pq_.empty()) {
auto current = pq_.top();
double current_weight = std::get<0>(current);
int current_depth = std::get<1>(current);
VertexAccessor current_vertex = std::get<2>(current);
std::experimental::optional<EdgeAccessor> current_edge =
std::get<3>(current);
pq_.pop();
// Check if the edge has already been processed.
if (weights_.find(current.first.first) != weights_.end()) {
auto current_state = create_state(current_vertex, current_depth);
// Check if the vertex has already been processed.
if (total_cost_.find(current_state) != total_cost_.end()) {
continue;
}
previous_.emplace(current.first.first, current.first.second);
weights_.emplace(current.first.first, current.second);
previous_.emplace(current_state, current_edge);
total_cost_.emplace(current_state, current_weight);
// Expand only if what we've just expanded is less than max depth.
if (current_depth < upper_bound_)
expand_from_vertex(current_vertex, current_weight, current_depth);
// If we yielded a path for a vertex already, make the expansion but
// don't return the path again.
if (yielded_vertices_.find(current_vertex) != yielded_vertices_.end())
continue;
// Reconstruct the path.
auto last_vertex = current.first.first;
auto last_vertex = current_vertex;
auto last_depth = current_depth;
std::vector<TypedValue> edge_list{};
while (true) {
// Origin_vertex must be in previous.
const auto &previous_edge = previous_.find(last_vertex)->second;
const auto &previous_edge =
previous_.find(create_state(last_vertex, last_depth))->second;
if (!previous_edge) break;
last_vertex = previous_edge->from() == last_vertex
? previous_edge->to()
: previous_edge->from();
last_depth--;
edge_list.push_back(previous_edge.value());
}
// Expand only if what we've just expanded is less then max depth.
if (static_cast<int>(edge_list.size()) < upper_bound_)
expand_from_vertex(current.first.first);
if (edge_list.empty()) continue;
// Place destination node on the frame, handle existence flag.
if (self_.existing_node_) {
TypedValue &node = frame[self_.node_symbol_];
if ((node != current.first.first).Value<bool>())
if ((node != current_vertex).Value<bool>())
continue;
else
// Prevent expanding other paths, because we found the
// shortest to existing node.
ClearQueue();
} else {
frame[self_.node_symbol_] = current.first.first;
frame[self_.node_symbol_] = current_vertex;
}
if (!self_.is_reverse_) {
@ -1345,7 +1376,8 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor {
std::reverse(edge_list.begin(), edge_list.end());
}
frame[self_.edge_symbol_] = std::move(edge_list);
frame[self_.total_weight_.value()] = current.second;
frame[self_.total_weight_.value()] = current_weight;
yielded_vertices_.insert(current_vertex);
return true;
}
}
@ -1354,7 +1386,8 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor {
void Reset() override {
input_cursor_->Reset();
previous_.clear();
weights_.clear();
total_cost_.clear();
yielded_vertices_.clear();
ClearQueue();
}
@ -1365,34 +1398,43 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor {
// Upper bound on the path length.
int upper_bound_{-1};
bool upper_bound_set_{false};
struct WspStateHash {
size_t operator()(const std::pair<VertexAccessor, int> &key) const {
return HashCombine<VertexAccessor, int>{}(key.first, key.second);
}
};
// Maps vertices to weights they got in expansion.
std::unordered_map<VertexAccessor, TypedValue> weights_;
std::unordered_map<std::pair<VertexAccessor, int>, TypedValue, WspStateHash>
total_cost_;
// Maps vertices to edges used to reach them.
std::unordered_map<VertexAccessor, std::experimental::optional<EdgeAccessor>>
std::unordered_map<std::pair<VertexAccessor, int>,
std::experimental::optional<EdgeAccessor>, WspStateHash>
previous_;
// Keeps track of vertices for which we yielded a path already.
std::unordered_set<VertexAccessor> yielded_vertices_;
// Priority queue comparator. Keep lowest weight on top of the queue.
class PriorityQueueComparator {
public:
bool operator()(
const std::pair<std::pair<VertexAccessor,
std::experimental::optional<EdgeAccessor>>,
double> &lhs,
const std::pair<std::pair<VertexAccessor,
std::experimental::optional<EdgeAccessor>>,
double> &rhs) {
return lhs.second > rhs.second;
const std::tuple<double, int, VertexAccessor,
std::experimental::optional<EdgeAccessor>> &lhs,
const std::tuple<double, int, VertexAccessor,
std::experimental::optional<EdgeAccessor>> &rhs) {
return std::get<0>(lhs) > std::get<0>(rhs);
}
};
std::priority_queue<
std::pair<
std::pair<VertexAccessor, std::experimental::optional<EdgeAccessor>>,
double>,
std::vector<std::pair<
std::pair<VertexAccessor, std::experimental::optional<EdgeAccessor>>,
double>>,
std::tuple<double, int, VertexAccessor,
std::experimental::optional<EdgeAccessor>>,
std::vector<std::tuple<double, int, VertexAccessor,
std::experimental::optional<EdgeAccessor>>>,
PriorityQueueComparator>
pq_;

View File

@ -1108,9 +1108,10 @@ class QueryPlanExpandWeightedShortestPath : public testing::Test {
for (auto &edge : e) edge.second.Reconstruct();
}
// defines and performs a breadth-first expansion with the given params
// defines and performs a weighted shortest expansion with the given params
// returns a vector of pairs. each pair is (vector-of-edges, vertex)
auto ExpandWShortest(EdgeAtom::Direction direction, int max_depth,
auto ExpandWShortest(EdgeAtom::Direction direction,
std::experimental::optional<int> max_depth,
Expression *where,
GraphView graph_view = GraphView::AS_IS,
std::experimental::optional<int> node_id = 0,
@ -1137,7 +1138,8 @@ class QueryPlanExpandWeightedShortestPath : public testing::Test {
auto filter_lambda = last_op = std::make_shared<ExpandVariable>(
node_sym, edge_list_sym, EdgeAtom::Type::WEIGHTED_SHORTEST_PATH,
direction, std::vector<storage::EdgeType>{}, false, nullptr,
LITERAL(max_depth), last_op, n.sym_, existing_node_input != nullptr,
max_depth ? LITERAL(max_depth.value()) : nullptr, last_op, n.sym_,
existing_node_input != nullptr,
ExpandVariable::Lambda{filter_edge, filter_node, where},
ExpandVariable::Lambda{weight_edge, weight_node,
PROPERTY_LOOKUP(ident_e, prop)},
@ -1183,7 +1185,7 @@ class QueryPlanExpandWeightedShortestPath : public testing::Test {
// / \
// / 12 \ 2
// [0]--------<--------[4]------->-------[5]
// \ / (only for GraphState test)
// \ / (on some tests only)
// \ /
// \->[2]->-[3]->/
// 3 3 3
@ -1324,6 +1326,40 @@ TEST_F(QueryPlanExpandWeightedShortestPath, ExistingNode) {
}
TEST_F(QueryPlanExpandWeightedShortestPath, UpperBound) {
{
auto results = ExpandWShortest(EdgeAtom::Direction::BOTH,
std::experimental::nullopt, LITERAL(true));
ASSERT_EQ(results.size(), 4);
EXPECT_EQ(GetProp(results[0].vertex), 2);
EXPECT_EQ(results[0].total_weight, 3);
EXPECT_EQ(GetProp(results[1].vertex), 1);
EXPECT_EQ(results[1].total_weight, 5);
EXPECT_EQ(GetProp(results[2].vertex), 3);
EXPECT_EQ(results[2].total_weight, 6);
EXPECT_EQ(GetProp(results[3].vertex), 4);
EXPECT_EQ(results[3].total_weight, 9);
}
{
auto new_vertex = dba.InsertVertex();
new_vertex.PropsSet(prop.second, 5);
auto edge = dba.InsertEdge(v[4], new_vertex, edge_type);
edge.PropsSet(prop.second, 2);
auto results = ExpandWShortest(EdgeAtom::Direction::BOTH, 3, LITERAL(true),
GraphView::NEW);
ASSERT_EQ(results.size(), 5);
EXPECT_EQ(GetProp(results[0].vertex), 2);
EXPECT_EQ(results[0].total_weight, 3);
EXPECT_EQ(GetProp(results[1].vertex), 1);
EXPECT_EQ(results[1].total_weight, 5);
EXPECT_EQ(GetProp(results[2].vertex), 3);
EXPECT_EQ(results[2].total_weight, 6);
EXPECT_EQ(GetProp(results[3].vertex), 4);
EXPECT_EQ(results[3].total_weight, 9);
EXPECT_EQ(GetProp(results[4].vertex), 5);
EXPECT_EQ(results[4].total_weight, 12);
}
{
auto results = ExpandWShortest(EdgeAtom::Direction::BOTH, 2, LITERAL(true));
ASSERT_EQ(results.size(), 4);