Support distributed query::plan::CreateExpand

Summary:
- The expansion vertex gets created on the origin's worker
- The edge automatically gets created wherever necessary
- Vertex creation logic reuse
- End to end test

Reviewers: teon.banek, msantl

Reviewed By: teon.banek

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1227
This commit is contained in:
florijan 2018-02-22 17:20:17 +01:00
parent c83078e7d9
commit cbc9420c17
6 changed files with 388 additions and 364 deletions

View File

@ -4,6 +4,7 @@
#include <future>
#include <limits>
#include <queue>
#include <random>
#include <string>
#include <type_traits>
#include <utility>
@ -102,6 +103,65 @@ CreateNode::CreateNode(const std::shared_ptr<LogicalOperator> &input,
node_atom_(node_atom),
on_random_worker_(on_random_worker) {}
namespace {
// Returns a random worker id. Worker ID is obtained from the Db.
int RandomWorkerId(database::GraphDb &db) {
thread_local std::mt19937 gen_{std::random_device{}()};
thread_local std::uniform_int_distribution<int> rand_;
auto worker_ids = db.GetWorkerIds();
return worker_ids[rand_(gen_) % worker_ids.size()];
}
// Creates a vertex on this GraphDb. Returns a reference to vertex placed on the
// frame.
VertexAccessor &CreateLocalVertex(NodeAtom *node_atom, Frame &frame,
Context &context) {
auto &dba = context.db_accessor_;
auto new_node = dba.InsertVertex();
for (auto label : node_atom->labels_) new_node.add_label(label);
// Evaluator should use the latest accessors, as modified in this query, when
// setting properties on new nodes.
ExpressionEvaluator evaluator(frame, context.parameters_,
context.symbol_table_, dba, GraphView::NEW);
for (auto &kv : node_atom->properties_)
PropsSetChecked(new_node, kv.first.second, kv.second->Accept(evaluator));
frame[context.symbol_table_.at(*node_atom->identifier_)] = new_node;
return frame[context.symbol_table_.at(*node_atom->identifier_)].ValueVertex();
}
// Creates a vertex on the GraphDb with the given worker_id. Can be this worker.
VertexAccessor &CreateVertexOnWorker(int worker_id, NodeAtom *node_atom,
Frame &frame, Context &context) {
auto &dba = context.db_accessor_;
if (worker_id == dba.db().WorkerId())
return CreateLocalVertex(node_atom, frame, context);
std::unordered_map<storage::Property, query::TypedValue> properties;
// Evaluator should use the latest accessors, as modified in this query, when
// setting properties on new nodes.
ExpressionEvaluator evaluator(frame, context.parameters_,
context.symbol_table_, dba, GraphView::NEW);
for (auto &kv : node_atom->properties_) {
auto value = kv.second->Accept(evaluator);
if (!value.IsPropertyValue()) {
throw QueryRuntimeException("'{}' cannot be used as a property value.",
value.type());
}
properties.emplace(kv.first.second, std::move(value));
}
auto new_node =
dba.InsertVertexIntoRemote(worker_id, node_atom->labels_, properties);
frame[context.symbol_table_.at(*node_atom->identifier_)] = new_node;
return frame[context.symbol_table_.at(*node_atom->identifier_)].ValueVertex();
}
} // namespace
ACCEPT_WITH_INPUT(CreateNode)
std::unique_ptr<Cursor> CreateNode::MakeCursor(
@ -123,15 +183,10 @@ CreateNode::CreateNodeCursor::CreateNodeCursor(const CreateNode &self,
bool CreateNode::CreateNodeCursor::Pull(Frame &frame, Context &context) {
if (input_cursor_->Pull(frame, context)) {
if (self_.on_random_worker_) {
auto worker_ids = context.db_accessor_.db().GetWorkerIds();
auto worker_id = worker_ids[rand_(gen_) % worker_ids.size()];
if (worker_id == context.db_accessor_.db().WorkerId()) {
CreateLocally(frame, context);
} else {
CreateOnWorker(worker_id, frame, context);
}
CreateVertexOnWorker(RandomWorkerId(db_.db()), self_.node_atom_, frame,
context);
} else {
CreateLocally(frame, context);
CreateLocalVertex(self_.node_atom_, frame, context);
}
return true;
}
@ -140,43 +195,6 @@ bool CreateNode::CreateNodeCursor::Pull(Frame &frame, Context &context) {
void CreateNode::CreateNodeCursor::Reset() { input_cursor_->Reset(); }
void CreateNode::CreateNodeCursor::CreateLocally(Frame &frame,
Context &context) {
auto new_node = db_.InsertVertex();
for (auto label : self_.node_atom_->labels_) new_node.add_label(label);
// Evaluator should use the latest accessors, as modified in this query, when
// setting properties on new nodes.
ExpressionEvaluator evaluator(frame, context.parameters_,
context.symbol_table_, db_, GraphView::NEW);
for (auto &kv : self_.node_atom_->properties_)
PropsSetChecked(new_node, kv.first.second, kv.second->Accept(evaluator));
frame[context.symbol_table_.at(*self_.node_atom_->identifier_)] = new_node;
}
void CreateNode::CreateNodeCursor::CreateOnWorker(int worker_id, Frame &frame,
Context &context) {
std::unordered_map<storage::Property, query::TypedValue> properties;
// Evaluator should use the latest accessors, as modified in this query, when
// setting properties on new nodes.
ExpressionEvaluator evaluator(frame, context.parameters_,
context.symbol_table_, db_, GraphView::NEW);
for (auto &kv : self_.node_atom_->properties_) {
auto value = kv.second->Accept(evaluator);
if (!value.IsPropertyValue()) {
throw QueryRuntimeException("'{}' cannot be used as a property value.",
value.type());
}
properties.emplace(kv.first.second, std::move(value));
}
auto new_node = context.db_accessor_.InsertVertexIntoRemote(
worker_id, self_.node_atom_->labels_, properties);
frame[context.symbol_table_.at(*self_.node_atom_->identifier_)] = new_node;
}
CreateExpand::CreateExpand(NodeAtom *node_atom, EdgeAtom *edge_atom,
const std::shared_ptr<LogicalOperator> &input,
Symbol input_symbol, bool existing_node)
@ -221,7 +239,7 @@ bool CreateExpand::CreateExpandCursor::Pull(Frame &frame, Context &context) {
v1.SwitchNew();
// get the destination vertex (possibly an existing node)
auto &v2 = OtherVertex(frame, context.symbol_table_, evaluator);
auto &v2 = OtherVertex(v1.GlobalAddress().worker_id(), frame, context);
v2.SwitchNew();
// create an edge between the two nodes
@ -246,23 +264,15 @@ bool CreateExpand::CreateExpandCursor::Pull(Frame &frame, Context &context) {
void CreateExpand::CreateExpandCursor::Reset() { input_cursor_->Reset(); }
VertexAccessor &CreateExpand::CreateExpandCursor::OtherVertex(
Frame &frame, const SymbolTable &symbol_table,
ExpressionEvaluator &evaluator) {
int worker_id, Frame &frame, Context &context) {
if (self_.existing_node_) {
const auto &dest_node_symbol =
symbol_table.at(*self_.node_atom_->identifier_);
context.symbol_table_.at(*self_.node_atom_->identifier_);
TypedValue &dest_node_value = frame[dest_node_symbol];
ExpectType(dest_node_symbol, dest_node_value, TypedValue::Type::Vertex);
return dest_node_value.Value<VertexAccessor>();
} else {
// the node does not exist, it needs to be created
auto node = db_.InsertVertex();
for (auto label : self_.node_atom_->labels_) node.add_label(label);
for (auto kv : self_.node_atom_->properties_)
PropsSetChecked(node, kv.first.second, kv.second->Accept(evaluator));
auto symbol = symbol_table.at(*self_.node_atom_->identifier_);
frame[symbol] = node;
return frame[symbol].Value<VertexAccessor>();
return CreateVertexOnWorker(worker_id, self_.node_atom_, frame, context);
}
}

View File

@ -5,7 +5,6 @@
#include <experimental/optional>
#include <future>
#include <memory>
#include <random>
#include <unordered_map>
#include <unordered_set>
#include <utility>
@ -274,16 +273,6 @@ class CreateNode : public LogicalOperator {
const CreateNode &self_;
database::GraphDbAccessor &db_;
const std::unique_ptr<Cursor> input_cursor_;
// For random worker choosing in distributed.
std::mt19937 gen_{std::random_device{}()};
std::uniform_int_distribution<int> rand_;
/** Creates a single node locally and places it in the frame. */
void CreateLocally(Frame &, Context &);
/** Creates a single node on the given worker and places it in the frame. */
void CreateOnWorker(int worker_id, Frame &, Context &);
};
friend class boost::serialization::access;
@ -372,12 +361,9 @@ class CreateExpand : public LogicalOperator {
database::GraphDbAccessor &db_;
const std::unique_ptr<Cursor> input_cursor_;
/**
* Helper function for getting an existing node or creating a new one.
* @return The newly created or already existing node.
*/
VertexAccessor &OtherVertex(Frame &frame, const SymbolTable &symbol_table,
ExpressionEvaluator &evaluator);
/** Gets the existing node (if existing_node_ == true), or creates a new
* node (on the given worker) and returns it. */
VertexAccessor &OtherVertex(int worker_id, Frame &frame, Context &context);
/**
* Helper function for creating an edge and adding it

View File

@ -109,6 +109,12 @@ class DistributedGraphDbTest : public ::testing::Test {
return edge_ga;
}
auto VertexCount(database::GraphDb &db) {
database::GraphDbAccessor dba{db};
auto vertices = dba.Vertices(false);
return std::distance(vertices.begin(), vertices.end());
};
private:
std::unique_ptr<database::Master> master_;
std::vector<std::unique_ptr<WorkerInThread>> workers_;

View File

@ -119,164 +119,6 @@ TEST_F(DistributedGraphDbTest, DispatchPlan) {
check_for_worker(worker(2));
}
TEST_F(DistributedGraphDbTest, RemotePullProduceRpc) {
GraphDbAccessor dba{master()};
Context ctx{dba};
SymbolGenerator symbol_generator{ctx.symbol_table_};
AstTreeStorage storage;
// Query plan for: UNWIND [42, true, "bla", 1, 2] as x RETURN x
using namespace query;
auto list =
LIST(LITERAL(42), LITERAL(true), LITERAL("bla"), LITERAL(1), LITERAL(2));
auto x = ctx.symbol_table_.CreateSymbol("x", true);
auto unwind = std::make_shared<plan::Unwind>(nullptr, list, x);
auto x_expr = IDENT("x");
ctx.symbol_table_[*x_expr] = x;
auto x_ne = NEXPR("x", x_expr);
ctx.symbol_table_[*x_ne] = ctx.symbol_table_.CreateSymbol("x_ne", true);
auto produce = MakeProduce(unwind, x_ne);
// Test that the plan works locally.
auto results = CollectProduce(produce.get(), ctx.symbol_table_, dba);
ASSERT_EQ(results.size(), 5);
const int plan_id = 42;
master().plan_dispatcher().DispatchPlan(plan_id, produce, ctx.symbol_table_);
Parameters params;
std::vector<query::Symbol> symbols{ctx.symbol_table_[*x_ne]};
auto remote_pull = [this, &params, &symbols](GraphDbAccessor &dba,
int worker_id) {
return master().remote_pull_clients().RemotePull(dba, worker_id, plan_id,
params, symbols, false, 3);
};
auto expect_first_batch = [](auto &batch) {
EXPECT_EQ(batch.pull_state,
distributed::RemotePullState::CURSOR_IN_PROGRESS);
ASSERT_EQ(batch.frames.size(), 3);
ASSERT_EQ(batch.frames[0].size(), 1);
EXPECT_EQ(batch.frames[0][0].ValueInt(), 42);
EXPECT_EQ(batch.frames[1][0].ValueBool(), true);
EXPECT_EQ(batch.frames[2][0].ValueString(), "bla");
};
auto expect_second_batch = [](auto &batch) {
EXPECT_EQ(batch.pull_state, distributed::RemotePullState::CURSOR_EXHAUSTED);
ASSERT_EQ(batch.frames.size(), 2);
ASSERT_EQ(batch.frames[0].size(), 1);
EXPECT_EQ(batch.frames[0][0].ValueInt(), 1);
EXPECT_EQ(batch.frames[1][0].ValueInt(), 2);
};
GraphDbAccessor dba_1{master()};
GraphDbAccessor dba_2{master()};
for (int worker_id : {1, 2}) {
// TODO flor, proper test async here.
auto tx1_batch1 = remote_pull(dba_1, worker_id).get();
expect_first_batch(tx1_batch1);
auto tx2_batch1 = remote_pull(dba_2, worker_id).get();
expect_first_batch(tx2_batch1);
auto tx2_batch2 = remote_pull(dba_2, worker_id).get();
expect_second_batch(tx2_batch2);
auto tx1_batch2 = remote_pull(dba_1, worker_id).get();
expect_second_batch(tx1_batch2);
}
}
TEST_F(DistributedGraphDbTest, RemotePullProduceRpcWithGraphElements) {
// Create some data on the master and both workers. Eeach edge (3 of them) and
// vertex (6 of them) will be uniquely identified with their worker id and
// sequence ID, so we can check we retrieved all.
storage::Property prop;
{
GraphDbAccessor dba{master()};
prop = dba.Property("prop");
auto create_data = [prop](GraphDbAccessor &dba, int worker_id) {
auto v1 = dba.InsertVertex();
v1.PropsSet(prop, worker_id * 10);
auto v2 = dba.InsertVertex();
v2.PropsSet(prop, worker_id * 10 + 1);
auto e12 = dba.InsertEdge(v1, v2, dba.EdgeType("et"));
e12.PropsSet(prop, worker_id * 10 + 2);
};
create_data(dba, 0);
GraphDbAccessor dba_w1{worker(1), dba.transaction_id()};
create_data(dba_w1, 1);
GraphDbAccessor dba_w2{worker(2), dba.transaction_id()};
create_data(dba_w2, 2);
dba.Commit();
}
GraphDbAccessor dba{master()};
Context ctx{dba};
SymbolGenerator symbol_generator{ctx.symbol_table_};
AstTreeStorage storage;
// Query plan for: MATCH p = (n)-[r]->(m) return [n, r], m, p
// Use this query to test graph elements are transferred correctly in
// collections too.
auto n = MakeScanAll(storage, ctx.symbol_table_, "n");
auto r_m =
MakeExpand(storage, ctx.symbol_table_, n.op_, n.sym_, "r",
EdgeAtom::Direction::OUT, {}, "m", false, GraphView::OLD);
auto p_sym = ctx.symbol_table_.CreateSymbol("p", true);
auto p = std::make_shared<query::plan::ConstructNamedPath>(
r_m.op_, p_sym,
std::vector<Symbol>{n.sym_, r_m.edge_sym_, r_m.node_sym_});
auto return_n = IDENT("n");
ctx.symbol_table_[*return_n] = n.sym_;
auto return_r = IDENT("r");
ctx.symbol_table_[*return_r] = r_m.edge_sym_;
auto return_n_r = NEXPR("[n, r]", LIST(return_n, return_r));
ctx.symbol_table_[*return_n_r] = ctx.symbol_table_.CreateSymbol("", true);
auto return_m = NEXPR("m", IDENT("m"));
ctx.symbol_table_[*return_m->expression_] = r_m.node_sym_;
ctx.symbol_table_[*return_m] = ctx.symbol_table_.CreateSymbol("", true);
auto return_p = NEXPR("p", IDENT("p"));
ctx.symbol_table_[*return_p->expression_] = p_sym;
ctx.symbol_table_[*return_p] = ctx.symbol_table_.CreateSymbol("", true);
auto produce = MakeProduce(p, return_n_r, return_m, return_p);
auto check_result = [prop](
int worker_id,
const std::vector<std::vector<query::TypedValue>> &frames) {
int offset = worker_id * 10;
ASSERT_EQ(frames.size(), 1);
auto &row = frames[0];
ASSERT_EQ(row.size(), 3);
auto &list = row[0].ValueList();
ASSERT_EQ(list.size(), 2);
ASSERT_EQ(list[0].ValueVertex().PropsAt(prop).Value<int64_t>(), offset);
ASSERT_EQ(list[1].ValueEdge().PropsAt(prop).Value<int64_t>(), offset + 2);
ASSERT_EQ(row[1].ValueVertex().PropsAt(prop).Value<int64_t>(), offset + 1);
auto &path = row[2].ValuePath();
ASSERT_EQ(path.size(), 1);
ASSERT_EQ(path.vertices()[0].PropsAt(prop).Value<int64_t>(), offset);
ASSERT_EQ(path.edges()[0].PropsAt(prop).Value<int64_t>(), offset + 2);
ASSERT_EQ(path.vertices()[1].PropsAt(prop).Value<int64_t>(), offset + 1);
};
// Test that the plan works locally.
auto results = CollectProduce(produce.get(), ctx.symbol_table_, dba);
check_result(0, results);
const int plan_id = 42;
master().plan_dispatcher().DispatchPlan(plan_id, produce, ctx.symbol_table_);
Parameters params;
std::vector<query::Symbol> symbols{ctx.symbol_table_[*return_n_r],
ctx.symbol_table_[*return_m], p_sym};
auto remote_pull = [this, &params, &symbols](GraphDbAccessor &dba,
int worker_id) {
return master().remote_pull_clients().RemotePull(dba, worker_id, plan_id,
params, symbols, false, 3);
};
auto future_w1_results = remote_pull(dba, 1);
auto future_w2_results = remote_pull(dba, 2);
check_result(1, future_w1_results.get().frames);
check_result(2, future_w2_results.get().frames);
}
TEST_F(DistributedGraphDbTest, BuildIndexDistributed) {
storage::Label label;
storage::Property property;
@ -334,85 +176,3 @@ TEST_F(DistributedGraphDbTest, WorkerOwnedDbAccessors) {
VertexAccessor v_in_w2{v_ga, dba_w2};
EXPECT_EQ(v_in_w2.PropsAt(prop).Value<int64_t>(), 42);
}
TEST_F(DistributedGraphDbTest, Synchronize) {
auto from = InsertVertex(worker(1));
auto to = InsertVertex(worker(2));
InsertEdge(from, to, "et");
// Query: MATCH (n)--(m) SET m.prop = 2 RETURN n.prop
// This query ensures that a remote update gets applied and the local stuff
// gets reconstructed.
auto &db = master();
GraphDbAccessor dba{db};
Context ctx{dba};
SymbolGenerator symbol_generator{ctx.symbol_table_};
AstTreeStorage storage;
// MATCH
auto n = MakeScanAll(storage, ctx.symbol_table_, "n");
auto r_m =
MakeExpand(storage, ctx.symbol_table_, n.op_, n.sym_, "r",
EdgeAtom::Direction::BOTH, {}, "m", false, GraphView::OLD);
// SET
auto literal = LITERAL(42);
auto prop = PROPERTY_PAIR("prop");
auto m_p = PROPERTY_LOOKUP("m", prop);
ctx.symbol_table_[*m_p->expression_] = r_m.node_sym_;
auto set_m_p = std::make_shared<plan::SetProperty>(r_m.op_, m_p, literal);
const int plan_id = 42;
master().plan_dispatcher().DispatchPlan(plan_id, set_m_p, ctx.symbol_table_);
// Master-side PullRemote, Synchronize
auto pull_remote = std::make_shared<query::plan::PullRemote>(
nullptr, plan_id, std::vector<Symbol>{n.sym_});
auto synchronize =
std::make_shared<query::plan::Synchronize>(set_m_p, pull_remote, true);
// RETURN
auto n_p =
storage.Create<PropertyLookup>(storage.Create<Identifier>("n"), prop);
ctx.symbol_table_[*n_p->expression_] = n.sym_;
auto return_n_p = NEXPR("n.prop", n_p);
auto return_n_p_sym = ctx.symbol_table_.CreateSymbol("n.p", true);
ctx.symbol_table_[*return_n_p] = return_n_p_sym;
auto produce = MakeProduce(synchronize, return_n_p);
auto results = CollectProduce(produce.get(), ctx.symbol_table_, dba);
ASSERT_EQ(results.size(), 2);
ASSERT_EQ(results[0].size(), 1);
EXPECT_EQ(results[0][0].ValueInt(), 42);
ASSERT_EQ(results[1].size(), 1);
EXPECT_EQ(results[1][0].ValueInt(), 42);
// TODO test without advance command?
}
TEST_F(DistributedGraphDbTest, Create) {
// Query: UNWIND range(0, 1000) as x CREATE ()
auto &db = master();
GraphDbAccessor dba{db};
Context ctx{dba};
SymbolGenerator symbol_generator{ctx.symbol_table_};
AstTreeStorage storage;
auto range = FN("range", LITERAL(0), LITERAL(1000));
auto x = ctx.symbol_table_.CreateSymbol("x", true);
auto unwind = std::make_shared<plan::Unwind>(nullptr, range, x);
auto node = NODE("n");
ctx.symbol_table_[*node->identifier_] =
ctx.symbol_table_.CreateSymbol("n", true);
auto create = std::make_shared<query::plan::CreateNode>(unwind, node, true);
PullAll(create, dba, ctx.symbol_table_);
dba.Commit();
auto vertex_count = [](database::GraphDb &db) {
database::GraphDbAccessor dba{db};
auto vertices = dba.Vertices(false);
return std::distance(vertices.begin(), vertices.end());
};
EXPECT_GT(vertex_count(master()), 200);
EXPECT_GT(vertex_count(worker(1)), 200);
EXPECT_GT(vertex_count(worker(2)), 200);
}

View File

@ -1,5 +1,5 @@
#include "gtest/gtest.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "database/graph_db.hpp"
#include "distributed_common.hpp"
@ -10,46 +10,50 @@
using namespace distributed;
using namespace database;
TEST_F(DistributedGraphDbTest, RemotePullTest) {
using Interpreter = query::Interpreter;
std::map<std::string, query::TypedValue> params = {};
class DistributedInterpretationTest : public DistributedGraphDbTest {
protected:
auto Run(const std::string &query) {
std::map<std::string, query::TypedValue> params = {};
GraphDbAccessor dba(master());
ResultStreamFaker result;
query::Interpreter interpreter_;
interpreter_(query, dba, params, false).PullAll(result);
dba.Commit();
return result.GetResults();
}
};
GraphDbAccessor dba(master());
TEST_F(DistributedInterpretationTest, RemotePullTest) {
auto results = Run("OPTIONAL MATCH(n) UNWIND(RANGE(0, 20)) AS X RETURN 1");
ASSERT_EQ(results.size(), 3 * 21);
ResultStreamFaker result;
Interpreter interpreter_;
interpreter_("OPTIONAL MATCH(n) UNWIND(RANGE(0, 20)) AS X RETURN 1", dba,
params, false)
.PullAll(result);
// Three instances (master + 2 workers) with 21 result each.
uint expected_results = 3U * 21;
ASSERT_EQ(result.GetHeader().size(), 1U);
EXPECT_EQ(result.GetHeader()[0], "1");
ASSERT_EQ(result.GetResults().size(), expected_results);
for (uint i = 0; i < expected_results; ++i) {
ASSERT_EQ(result.GetResults()[i].size(), 1U);
ASSERT_EQ(result.GetResults()[i][0].Value<int64_t>(), 1);
for (auto result : results) {
ASSERT_EQ(result.size(), 1U);
ASSERT_EQ(result[0].ValueInt(), 1);
}
}
TEST_F(DistributedGraphDbTest, RemotePullNoResultsTest) {
using Interpreter = query::Interpreter;
std::map<std::string, query::TypedValue> params = {};
GraphDbAccessor dba(master());
ResultStreamFaker result;
Interpreter interpreter_;
interpreter_("MATCH (n) RETURN n", dba, params, false).PullAll(result);
ASSERT_EQ(result.GetHeader().size(), 1U);
EXPECT_EQ(result.GetHeader()[0], "n");
ASSERT_EQ(result.GetResults().size(), 0U);
TEST_F(DistributedInterpretationTest, RemotePullNoResultsTest) {
auto results = Run("MATCH (n) RETURN n");
ASSERT_EQ(results.size(), 0U);
}
TEST_F(DistributedGraphDbTest, RemoteExpandTest2) {
TEST_F(DistributedInterpretationTest, CreateExpand) {
InsertVertex(master());
InsertVertex(worker(1));
InsertVertex(worker(1));
InsertVertex(worker(2));
InsertVertex(worker(2));
InsertVertex(worker(2));
Run("MATCH (n) CREATE (n)-[:T]->(m) RETURN n");
EXPECT_EQ(VertexCount(master()), 2);
EXPECT_EQ(VertexCount(worker(1)), 4);
EXPECT_EQ(VertexCount(worker(2)), 6);
}
TEST_F(DistributedInterpretationTest, RemoteExpandTest2) {
// Make a fully connected graph with vertices scattered across master and
// worker storage.
// Vertex count is low, because test gets exponentially slower. The expected
@ -70,21 +74,15 @@ TEST_F(DistributedGraphDbTest, RemoteExpandTest2) {
};
std::vector<std::string> edge_types;
edge_types.reserve(vertices.size() * vertices.size());
for (int i = 0; i < vertices.size(); ++i) {
for (int j = 0; j < vertices.size(); ++j) {
for (size_t i = 0; i < vertices.size(); ++i) {
for (size_t j = 0; j < vertices.size(); ++j) {
auto edge_type = get_edge_type(i, j);
edge_types.push_back(edge_type);
InsertEdge(vertices[i], vertices[j], edge_type);
}
}
query::Interpreter interpret;
std::map<std::string, query::TypedValue> params;
GraphDbAccessor dba(master());
ResultStreamFaker result;
interpret("MATCH (n)-[r1]-(m)-[r2]-(l) RETURN type(r1), type(r2)", dba,
params, false)
.PullAll(result);
ASSERT_EQ(result.GetHeader().size(), 2U);
auto results = Run("MATCH (n)-[r1]-(m)-[r2]-(l) RETURN type(r1), type(r2)");
// We expect the number of results to be:
size_t expected_result_size =
// pick (n)
@ -96,11 +94,11 @@ TEST_F(DistributedGraphDbTest, RemoteExpandTest2) {
(2 * vertices.size() - 1 - 1);
std::vector<std::vector<std::string>> expected;
expected.reserve(expected_result_size);
for (int n = 0; n < vertices.size(); ++n) {
for (int m = 0; m < vertices.size(); ++m) {
for (size_t n = 0; n < vertices.size(); ++n) {
for (size_t m = 0; m < vertices.size(); ++m) {
std::vector<std::string> r1s{get_edge_type(n, m)};
if (n != m) r1s.push_back(get_edge_type(m, n));
for (int l = 0; l < vertices.size(); ++l) {
for (size_t l = 0; l < vertices.size(); ++l) {
std::vector<std::string> r2s{get_edge_type(m, l)};
if (m != l) r2s.push_back(get_edge_type(l, m));
for (const auto &r1 : r1s) {
@ -113,10 +111,10 @@ TEST_F(DistributedGraphDbTest, RemoteExpandTest2) {
}
}
ASSERT_EQ(expected.size(), expected_result_size);
ASSERT_EQ(result.GetResults().size(), expected_result_size);
ASSERT_EQ(results.size(), expected_result_size);
std::vector<std::vector<std::string>> got;
got.reserve(result.GetResults().size());
for (const auto &res : result.GetResults()) {
got.reserve(results.size());
for (const auto &res : results) {
std::vector<std::string> row;
row.reserve(res.size());
for (const auto &col : res) {

View File

@ -0,0 +1,264 @@
#include <memory>
#include <thread>
#include <unordered_set>
#include "gtest/gtest.h"
#include "database/graph_db.hpp"
#include "distributed/coordination.hpp"
#include "distributed/coordination_master.hpp"
#include "distributed/coordination_worker.hpp"
#include "distributed/plan_consumer.hpp"
#include "distributed/plan_dispatcher.hpp"
#include "distributed/remote_data_rpc_clients.hpp"
#include "distributed/remote_data_rpc_server.hpp"
#include "distributed/remote_pull_rpc_clients.hpp"
#include "distributed_common.hpp"
#include "io/network/endpoint.hpp"
#include "query/frontend/ast/ast.hpp"
#include "query/frontend/ast/cypher_main_visitor.hpp"
#include "query/frontend/semantic/symbol_generator.hpp"
#include "query/frontend/semantic/symbol_table.hpp"
#include "query/interpreter.hpp"
#include "query/plan/planner.hpp"
#include "query/typed_value.hpp"
#include "query_common.hpp"
#include "query_plan_common.hpp"
#include "transactions/engine_master.hpp"
using namespace distributed;
using namespace database;
TEST_F(DistributedGraphDbTest, RemotePullProduceRpc) {
GraphDbAccessor dba{master()};
Context ctx{dba};
SymbolGenerator symbol_generator{ctx.symbol_table_};
AstTreeStorage storage;
// Query plan for: UNWIND [42, true, "bla", 1, 2] as x RETURN x
using namespace query;
auto list =
LIST(LITERAL(42), LITERAL(true), LITERAL("bla"), LITERAL(1), LITERAL(2));
auto x = ctx.symbol_table_.CreateSymbol("x", true);
auto unwind = std::make_shared<plan::Unwind>(nullptr, list, x);
auto x_expr = IDENT("x");
ctx.symbol_table_[*x_expr] = x;
auto x_ne = NEXPR("x", x_expr);
ctx.symbol_table_[*x_ne] = ctx.symbol_table_.CreateSymbol("x_ne", true);
auto produce = MakeProduce(unwind, x_ne);
// Test that the plan works locally.
auto results = CollectProduce(produce.get(), ctx.symbol_table_, dba);
ASSERT_EQ(results.size(), 5);
const int plan_id = 42;
master().plan_dispatcher().DispatchPlan(plan_id, produce, ctx.symbol_table_);
Parameters params;
std::vector<query::Symbol> symbols{ctx.symbol_table_[*x_ne]};
auto remote_pull = [this, &params, &symbols](GraphDbAccessor &dba,
int worker_id) {
return master().remote_pull_clients().RemotePull(dba, worker_id, plan_id,
params, symbols, false, 3);
};
auto expect_first_batch = [](auto &batch) {
EXPECT_EQ(batch.pull_state,
distributed::RemotePullState::CURSOR_IN_PROGRESS);
ASSERT_EQ(batch.frames.size(), 3);
ASSERT_EQ(batch.frames[0].size(), 1);
EXPECT_EQ(batch.frames[0][0].ValueInt(), 42);
EXPECT_EQ(batch.frames[1][0].ValueBool(), true);
EXPECT_EQ(batch.frames[2][0].ValueString(), "bla");
};
auto expect_second_batch = [](auto &batch) {
EXPECT_EQ(batch.pull_state, distributed::RemotePullState::CURSOR_EXHAUSTED);
ASSERT_EQ(batch.frames.size(), 2);
ASSERT_EQ(batch.frames[0].size(), 1);
EXPECT_EQ(batch.frames[0][0].ValueInt(), 1);
EXPECT_EQ(batch.frames[1][0].ValueInt(), 2);
};
GraphDbAccessor dba_1{master()};
GraphDbAccessor dba_2{master()};
for (int worker_id : {1, 2}) {
// TODO flor, proper test async here.
auto tx1_batch1 = remote_pull(dba_1, worker_id).get();
expect_first_batch(tx1_batch1);
auto tx2_batch1 = remote_pull(dba_2, worker_id).get();
expect_first_batch(tx2_batch1);
auto tx2_batch2 = remote_pull(dba_2, worker_id).get();
expect_second_batch(tx2_batch2);
auto tx1_batch2 = remote_pull(dba_1, worker_id).get();
expect_second_batch(tx1_batch2);
}
}
TEST_F(DistributedGraphDbTest, RemotePullProduceRpcWithGraphElements) {
// Create some data on the master and both workers. Eeach edge (3 of them) and
// vertex (6 of them) will be uniquely identified with their worker id and
// sequence ID, so we can check we retrieved all.
storage::Property prop;
{
GraphDbAccessor dba{master()};
prop = dba.Property("prop");
auto create_data = [prop](GraphDbAccessor &dba, int worker_id) {
auto v1 = dba.InsertVertex();
v1.PropsSet(prop, worker_id * 10);
auto v2 = dba.InsertVertex();
v2.PropsSet(prop, worker_id * 10 + 1);
auto e12 = dba.InsertEdge(v1, v2, dba.EdgeType("et"));
e12.PropsSet(prop, worker_id * 10 + 2);
};
create_data(dba, 0);
GraphDbAccessor dba_w1{worker(1), dba.transaction_id()};
create_data(dba_w1, 1);
GraphDbAccessor dba_w2{worker(2), dba.transaction_id()};
create_data(dba_w2, 2);
dba.Commit();
}
GraphDbAccessor dba{master()};
Context ctx{dba};
SymbolGenerator symbol_generator{ctx.symbol_table_};
AstTreeStorage storage;
// Query plan for: MATCH p = (n)-[r]->(m) return [n, r], m, p
// Use this query to test graph elements are transferred correctly in
// collections too.
auto n = MakeScanAll(storage, ctx.symbol_table_, "n");
auto r_m =
MakeExpand(storage, ctx.symbol_table_, n.op_, n.sym_, "r",
EdgeAtom::Direction::OUT, {}, "m", false, GraphView::OLD);
auto p_sym = ctx.symbol_table_.CreateSymbol("p", true);
auto p = std::make_shared<query::plan::ConstructNamedPath>(
r_m.op_, p_sym,
std::vector<Symbol>{n.sym_, r_m.edge_sym_, r_m.node_sym_});
auto return_n = IDENT("n");
ctx.symbol_table_[*return_n] = n.sym_;
auto return_r = IDENT("r");
ctx.symbol_table_[*return_r] = r_m.edge_sym_;
auto return_n_r = NEXPR("[n, r]", LIST(return_n, return_r));
ctx.symbol_table_[*return_n_r] = ctx.symbol_table_.CreateSymbol("", true);
auto return_m = NEXPR("m", IDENT("m"));
ctx.symbol_table_[*return_m->expression_] = r_m.node_sym_;
ctx.symbol_table_[*return_m] = ctx.symbol_table_.CreateSymbol("", true);
auto return_p = NEXPR("p", IDENT("p"));
ctx.symbol_table_[*return_p->expression_] = p_sym;
ctx.symbol_table_[*return_p] = ctx.symbol_table_.CreateSymbol("", true);
auto produce = MakeProduce(p, return_n_r, return_m, return_p);
auto check_result = [prop](
int worker_id,
const std::vector<std::vector<query::TypedValue>> &frames) {
int offset = worker_id * 10;
ASSERT_EQ(frames.size(), 1);
auto &row = frames[0];
ASSERT_EQ(row.size(), 3);
auto &list = row[0].ValueList();
ASSERT_EQ(list.size(), 2);
ASSERT_EQ(list[0].ValueVertex().PropsAt(prop).Value<int64_t>(), offset);
ASSERT_EQ(list[1].ValueEdge().PropsAt(prop).Value<int64_t>(), offset + 2);
ASSERT_EQ(row[1].ValueVertex().PropsAt(prop).Value<int64_t>(), offset + 1);
auto &path = row[2].ValuePath();
ASSERT_EQ(path.size(), 1);
ASSERT_EQ(path.vertices()[0].PropsAt(prop).Value<int64_t>(), offset);
ASSERT_EQ(path.edges()[0].PropsAt(prop).Value<int64_t>(), offset + 2);
ASSERT_EQ(path.vertices()[1].PropsAt(prop).Value<int64_t>(), offset + 1);
};
// Test that the plan works locally.
auto results = CollectProduce(produce.get(), ctx.symbol_table_, dba);
check_result(0, results);
const int plan_id = 42;
master().plan_dispatcher().DispatchPlan(plan_id, produce, ctx.symbol_table_);
Parameters params;
std::vector<query::Symbol> symbols{ctx.symbol_table_[*return_n_r],
ctx.symbol_table_[*return_m], p_sym};
auto remote_pull = [this, &params, &symbols](GraphDbAccessor &dba,
int worker_id) {
return master().remote_pull_clients().RemotePull(dba, worker_id, plan_id,
params, symbols, false, 3);
};
auto future_w1_results = remote_pull(dba, 1);
auto future_w2_results = remote_pull(dba, 2);
check_result(1, future_w1_results.get().frames);
check_result(2, future_w2_results.get().frames);
}
TEST_F(DistributedGraphDbTest, Synchronize) {
auto from = InsertVertex(worker(1));
auto to = InsertVertex(worker(2));
InsertEdge(from, to, "et");
// Query: MATCH (n)--(m) SET m.prop = 2 RETURN n.prop
// This query ensures that a remote update gets applied and the local stuff
// gets reconstructed.
auto &db = master();
GraphDbAccessor dba{db};
Context ctx{dba};
SymbolGenerator symbol_generator{ctx.symbol_table_};
AstTreeStorage storage;
// MATCH
auto n = MakeScanAll(storage, ctx.symbol_table_, "n");
auto r_m =
MakeExpand(storage, ctx.symbol_table_, n.op_, n.sym_, "r",
EdgeAtom::Direction::BOTH, {}, "m", false, GraphView::OLD);
// SET
auto literal = LITERAL(42);
auto prop = PROPERTY_PAIR("prop");
auto m_p = PROPERTY_LOOKUP("m", prop);
ctx.symbol_table_[*m_p->expression_] = r_m.node_sym_;
auto set_m_p = std::make_shared<plan::SetProperty>(r_m.op_, m_p, literal);
const int plan_id = 42;
master().plan_dispatcher().DispatchPlan(plan_id, set_m_p, ctx.symbol_table_);
// Master-side PullRemote, Synchronize
auto pull_remote = std::make_shared<query::plan::PullRemote>(
nullptr, plan_id, std::vector<Symbol>{n.sym_});
auto synchronize =
std::make_shared<query::plan::Synchronize>(set_m_p, pull_remote, true);
// RETURN
auto n_p =
storage.Create<PropertyLookup>(storage.Create<Identifier>("n"), prop);
ctx.symbol_table_[*n_p->expression_] = n.sym_;
auto return_n_p = NEXPR("n.prop", n_p);
auto return_n_p_sym = ctx.symbol_table_.CreateSymbol("n.p", true);
ctx.symbol_table_[*return_n_p] = return_n_p_sym;
auto produce = MakeProduce(synchronize, return_n_p);
auto results = CollectProduce(produce.get(), ctx.symbol_table_, dba);
ASSERT_EQ(results.size(), 2);
ASSERT_EQ(results[0].size(), 1);
EXPECT_EQ(results[0][0].ValueInt(), 42);
ASSERT_EQ(results[1].size(), 1);
EXPECT_EQ(results[1][0].ValueInt(), 42);
// TODO test without advance command?
}
TEST_F(DistributedGraphDbTest, Create) {
// Query: UNWIND range(0, 1000) as x CREATE ()
auto &db = master();
GraphDbAccessor dba{db};
Context ctx{dba};
SymbolGenerator symbol_generator{ctx.symbol_table_};
AstTreeStorage storage;
auto range = FN("range", LITERAL(0), LITERAL(1000));
auto x = ctx.symbol_table_.CreateSymbol("x", true);
auto unwind = std::make_shared<plan::Unwind>(nullptr, range, x);
auto node = NODE("n");
ctx.symbol_table_[*node->identifier_] =
ctx.symbol_table_.CreateSymbol("n", true);
auto create = std::make_shared<query::plan::CreateNode>(unwind, node, true);
PullAll(create, dba, ctx.symbol_table_);
dba.Commit();
EXPECT_GT(VertexCount(master()), 200);
EXPECT_GT(VertexCount(worker(1)), 200);
EXPECT_GT(VertexCount(worker(2)), 200);
}