Use std::async for fetching remote edges in Expand

Summary:
This may improve the query execution workload, because the Expand will
yield local results while it waits for remote ones. Note that we rely on
the fact that walking the graph produces results without any predetermined
order. Therefore, we can yield paths as we see fit. Enforcing the order
is done through OrderBy operator, and this change shouldn't affect that.

Reviewers: florijan, msantl, buda

Reviewed By: florijan

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1215
This commit is contained in:
Teon Banek 2018-02-22 17:07:35 +01:00
parent 753aa07cdf
commit 452ce6a30c
5 changed files with 205 additions and 22 deletions

View File

@ -1,6 +1,7 @@
#include "query/plan/operator.hpp"
#include <algorithm>
#include <future>
#include <limits>
#include <queue>
#include <string>
@ -22,9 +23,10 @@
#include "query/frontend/semantic/symbol_table.hpp"
#include "query/interpret/eval.hpp"
#include "query/path.hpp"
#include "utils/algorithm.hpp"
#include "utils/exceptions.hpp"
DEFINE_HIDDEN_int32(remote_pull_sleep, 2,
DEFINE_HIDDEN_int32(remote_pull_sleep, 1,
"Sleep between remote result pulling in milliseconds");
// macro for the default implementation of LogicalOperator::Accept
@ -527,33 +529,105 @@ bool Expand::ExpandCursor::Pull(Frame &frame, Context &context) {
}
};
auto push_future_edge = [this, &frame](auto edge, auto direction) {
auto edge_to = std::async(std::launch::async, [edge, direction]() {
if (direction == EdgeAtom::Direction::IN)
return std::make_pair(edge, edge.from());
if (direction == EdgeAtom::Direction::OUT)
return std::make_pair(edge, edge.to());
LOG(FATAL) << "Must indicate exact expansion direction here";
});
future_expands_.emplace_back(
FutureExpand{std::move(edge_to), frame.elems()});
};
auto find_ready_future = [this]() {
return std::find_if(future_expands_.begin(), future_expands_.end(),
[](const auto &future) {
return utils::IsFutureReady(future.edge_to);
});
};
auto put_future_edge_on_frame = [this, &frame](auto &future) {
auto edge_to = future.edge_to.get();
frame.elems() = future.frame_elems;
frame[self_.edge_symbol_] = edge_to.first;
frame[self_.node_symbol_] = edge_to.second;
};
while (true) {
if (db_.should_abort()) throw HintedAbortError();
// Try to get any remote edges we may have available first. If we yielded
// all of the local edges first, we may accumulate large amounts of future
// edges.
{
auto future_it = find_ready_future();
if (future_it != future_expands_.end()) {
// Backup the current frame (if we haven't done so already) before
// putting the future edge.
if (last_frame_.empty()) last_frame_ = frame.elems();
put_future_edge_on_frame(*future_it);
// Erase the future and return true to yield the result.
future_expands_.erase(future_it);
return true;
}
}
// In case we have replaced the frame with the one for a future edge,
// restore it.
if (!last_frame_.empty()) {
frame.elems() = last_frame_;
last_frame_.clear();
}
// attempt to get a value from the incoming edges
if (in_edges_ && *in_edges_it_ != in_edges_->end()) {
EdgeAccessor edge = *(*in_edges_it_)++;
frame[self_.edge_symbol_] = edge;
pull_node(edge, EdgeAtom::Direction::IN);
return true;
auto edge = *(*in_edges_it_)++;
if (edge.address().is_local() || self_.existing_node_) {
frame[self_.edge_symbol_] = edge;
pull_node(edge, EdgeAtom::Direction::IN);
return true;
} else {
push_future_edge(edge, EdgeAtom::Direction::IN);
continue;
}
}
// attempt to get a value from the outgoing edges
if (out_edges_ && *out_edges_it_ != out_edges_->end()) {
EdgeAccessor edge = *(*out_edges_it_)++;
auto edge = *(*out_edges_it_)++;
// when expanding in EdgeAtom::Direction::BOTH directions
// we should do only one expansion for cycles, and it was
// already done in the block above
if (self_.direction_ == EdgeAtom::Direction::BOTH && edge.is_cycle())
continue;
frame[self_.edge_symbol_] = edge;
pull_node(edge, EdgeAtom::Direction::OUT);
return true;
if (edge.address().is_local() || self_.existing_node_) {
frame[self_.edge_symbol_] = edge;
pull_node(edge, EdgeAtom::Direction::OUT);
return true;
} else {
push_future_edge(edge, EdgeAtom::Direction::OUT);
continue;
}
}
// if we are here, either the edges have not been initialized,
// or they have been exhausted. attempt to initialize the edges,
// if the input is exhausted
if (!InitEdges(frame, context)) return false;
if (!InitEdges(frame, context)) {
// We are done with local and remote edges so return false.
if (future_expands_.empty()) return false;
// We still need to yield remote edges.
auto future_it = find_ready_future();
if (future_it != future_expands_.end()) {
put_future_edge_on_frame(*future_it);
// Erase the future and return true to yield the result.
future_expands_.erase(future_it);
return true;
}
// We are still waiting for future edges, so sleep and fallthrough to
// continue the loop.
std::this_thread::sleep_for(
std::chrono::milliseconds(FLAGS_remote_pull_sleep));
}
// we have re-initialized the edges, continue with the loop
}
@ -565,6 +639,8 @@ void Expand::ExpandCursor::Reset() {
in_edges_it_ = std::experimental::nullopt;
out_edges_ = std::experimental::nullopt;
out_edges_it_ = std::experimental::nullopt;
future_expands_.clear();
last_frame_.clear();
}
namespace {
@ -1130,8 +1206,9 @@ class ExpandWeightedShortestPathCursor : public query::plan::Cursor {
self_.graph_view_);
// For the given (vertex, edge, vertex) tuple checks if they satisfy the
// "where" condition. if so, places them in the priority queue.
auto expand_pair = [this, &evaluator, &frame](
VertexAccessor from, EdgeAccessor edge, VertexAccessor vertex) {
auto expand_pair = [this, &evaluator, &frame](VertexAccessor from,
EdgeAccessor edge,
VertexAccessor vertex) {
SwitchAccessor(edge, self_.graph_view_);
SwitchAccessor(vertex, self_.graph_view_);
@ -2953,7 +3030,7 @@ bool PullRemote::PullRemoteCursor::Pull(Frame &frame, Context &context) {
if (found_it == remote_pulls_.end()) continue;
auto &remote_pull = found_it->second;
if (!remote_pull.valid()) continue;
if (!utils::IsFutureReady(remote_pull)) continue;
auto remote_results = remote_pull.get();
switch (remote_results.pull_state) {

View File

@ -3,6 +3,7 @@
#pragma once
#include <experimental/optional>
#include <future>
#include <memory>
#include <random>
#include <unordered_map>
@ -27,8 +28,6 @@
#include "utils/hashing/fnv.hpp"
#include "utils/visitor.hpp"
DECLARE_int32(remote_pull_sleep);
namespace database {
class GraphDbAccessor;
}
@ -779,7 +778,7 @@ class ExpandCommon {
* Filtering on existing means that for a pattern that references
* an already declared node or edge (for example in
* MATCH (a) MATCH (a)--(b)),
* only expansions that match defined equalities are succesfully
* only expansions that match defined equalities are successfully
* pulled.
*/
class Expand : public LogicalOperator, public ExpandCommon {
@ -802,6 +801,11 @@ class Expand : public LogicalOperator, public ExpandCommon {
void Reset() override;
private:
struct FutureExpand {
std::future<std::pair<EdgeAccessor, VertexAccessor>> edge_to;
std::vector<TypedValue> frame_elems;
};
const Expand &self_;
const std::unique_ptr<Cursor> input_cursor_;
database::GraphDbAccessor &db_;
@ -813,6 +817,11 @@ class Expand : public LogicalOperator, public ExpandCommon {
std::experimental::optional<InEdgeIteratorT> in_edges_it_;
std::experimental::optional<OutEdgeT> out_edges_;
std::experimental::optional<OutEdgeIteratorT> out_edges_it_;
// Edges which are being asynchronously fetched from a remote worker.
std::vector<FutureExpand> future_expands_;
// Stores the last frame before we yield the frame for future edge. It needs
// to be restored afterward.
std::vector<TypedValue> last_frame_;
bool InitEdges(Frame &, Context &);
};

View File

@ -1,6 +1,8 @@
#pragma once
#include <algorithm>
#include <chrono>
#include <future>
#include <string>
#include <utility>
@ -122,4 +124,16 @@ class Iterable {
TIterator begin_;
TIterator end_;
};
/**
* Returns true if the future has the result available.
* NOTE: The behaviour is undefined if future isn't valid, i.e.
* `future.valid() == false`.
*/
template <typename T>
bool IsFutureReady(const std::future<T> &future) {
auto status = future.wait_for(std::chrono::seconds(0));
return status == std::future_status::ready;
}
} // namespace utils

View File

@ -75,10 +75,15 @@ class DistributedGraphDbTest : public ::testing::Test {
}
/// Inserts an edge (on the 'from' side) and returns it's global address.
storage::EdgeAddress InsertEdge(storage::VertexAddress from,
storage::VertexAddress to,
const std::string &edge_type_name) {
database::GraphDbAccessor dba{worker(from.worker_id())};
auto InsertEdge(storage::VertexAddress from, storage::VertexAddress to,
const std::string &edge_type_name) {
CHECK(from.is_remote() && to.is_remote())
<< "Distributed test InsertEdge only takes global addresses";
auto db_for_vertex = [this](const auto &vertex) -> database::GraphDb & {
if (vertex.worker_id()) return worker(vertex.worker_id());
return master();
};
database::GraphDbAccessor dba(db_for_vertex(from));
auto from_v = dba.FindVertexChecked(from.gid(), false);
auto edge_type = dba.EdgeType(edge_type_name);
@ -95,8 +100,7 @@ class DistributedGraphDbTest : public ::testing::Test {
dba.db().storage().EdgeGenerator().Next())
.GlobalAddress();
from_v.update().out_.emplace(to, edge_ga, edge_type);
database::GraphDbAccessor dba_to{worker(to.worker_id()),
dba.transaction_id()};
database::GraphDbAccessor dba_to(db_for_vertex(to), dba.transaction_id());
auto to_v = dba_to.FindVertexChecked(to.gid(), false);
to_v.update().in_.emplace(from, edge_ga, edge_type);

View File

@ -1,4 +1,5 @@
#include "gtest/gtest.h"
#include "gmock/gmock.h"
#include "database/graph_db.hpp"
#include "distributed_common.hpp"
@ -47,3 +48,81 @@ TEST_F(DistributedGraphDbTest, RemotePullNoResultsTest) {
EXPECT_EQ(result.GetHeader()[0], "n");
ASSERT_EQ(result.GetResults().size(), 0U);
}
TEST_F(DistributedGraphDbTest, RemoteExpandTest2) {
// Make a fully connected graph with vertices scattered across master and
// worker storage.
// Vertex count is low, because test gets exponentially slower. The expected
// result size is ~ vertices^3, and then that is compared at the end in no
// particular order which causes O(result_size^2) comparisons.
int verts_per_storage = 3;
std::vector<storage::VertexAddress> vertices;
vertices.reserve(verts_per_storage * 3);
auto add_vertices = [this, &vertices, &verts_per_storage](auto &db) {
for (int i = 0; i < verts_per_storage; ++i)
vertices.push_back(InsertVertex(db));
};
add_vertices(master());
add_vertices(worker(1));
add_vertices(worker(2));
auto get_edge_type = [](int v1, int v2) {
return std::to_string(v1) + "-" + std::to_string(v2);
};
std::vector<std::string> edge_types;
edge_types.reserve(vertices.size() * vertices.size());
for (int i = 0; i < vertices.size(); ++i) {
for (int j = 0; j < vertices.size(); ++j) {
auto edge_type = get_edge_type(i, j);
edge_types.push_back(edge_type);
InsertEdge(vertices[i], vertices[j], edge_type);
}
}
query::Interpreter interpret;
std::map<std::string, query::TypedValue> params;
GraphDbAccessor dba(master());
ResultStreamFaker result;
interpret("MATCH (n)-[r1]-(m)-[r2]-(l) RETURN type(r1), type(r2)", dba,
params, false)
.PullAll(result);
ASSERT_EQ(result.GetHeader().size(), 2U);
// We expect the number of results to be:
size_t expected_result_size =
// pick (n)
vertices.size() *
// pick both directed edges to other (m) and a
// single edge to (m) which equals (n), hence -1
(2 * vertices.size() - 1) *
// Pick as before, but exclude the previously taken edge, hence another -1
(2 * vertices.size() - 1 - 1);
std::vector<std::vector<std::string>> expected;
expected.reserve(expected_result_size);
for (int n = 0; n < vertices.size(); ++n) {
for (int m = 0; m < vertices.size(); ++m) {
std::vector<std::string> r1s{get_edge_type(n, m)};
if (n != m) r1s.push_back(get_edge_type(m, n));
for (int l = 0; l < vertices.size(); ++l) {
std::vector<std::string> r2s{get_edge_type(m, l)};
if (m != l) r2s.push_back(get_edge_type(l, m));
for (const auto &r1 : r1s) {
for (const auto &r2 : r2s) {
if (r1 == r2) continue;
expected.push_back({r1, r2});
}
}
}
}
}
ASSERT_EQ(expected.size(), expected_result_size);
ASSERT_EQ(result.GetResults().size(), expected_result_size);
std::vector<std::vector<std::string>> got;
got.reserve(result.GetResults().size());
for (const auto &res : result.GetResults()) {
std::vector<std::string> row;
row.reserve(res.size());
for (const auto &col : res) {
row.push_back(col.Value<std::string>());
}
got.push_back(row);
}
ASSERT_THAT(got, testing::UnorderedElementsAreArray(expected));
}