memgraph/tests/unit/distributed_interpretation.cpp

324 lines
10 KiB
C++
Raw Normal View History

#include <chrono>
#include <optional>
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "communication/result_stream_faker.hpp"
#include "database/distributed/graph_db.hpp"
#include "distributed/plan_consumer.hpp"
#include "distributed/plan_dispatcher.hpp"
#include "distributed/pull_rpc_clients.hpp"
#include "distributed_common.hpp"
#include "query/distributed/interpreter.hpp"
#include "query_common.hpp"
#include "query_plan_common.hpp"
#include "utils/timer.hpp"
// We use this to ensure a cached plan is removed from the concurrent map and
// properly destructed.
DECLARE_int32(skiplist_gc_interval);
using namespace distributed;
using namespace database;
using namespace std::literals::chrono_literals;
class DistributedInterpretationTest : public DistributedGraphDbTest {
protected:
DistributedInterpretationTest() : DistributedGraphDbTest("interpretation") {}
void SetUp() override {
DistributedGraphDbTest::SetUp();
interpreter_.emplace(&master());
}
void TearDown() override {
interpreter_ = std::nullopt;
DistributedGraphDbTest::TearDown();
}
auto RunWithDba(const std::string &query, GraphDbAccessor &dba) {
Clean-up TypedValue misuse Summary: In a bunch of places `TypedValue` was used where `PropertyValue` should be. A lot of times it was only because `TypedValue` serialization code could be reused for `PropertyValue`, only without providing callbacks for `VERTEX`, `EDGE` and `PATH`. So first I wrote separate serialization code for `PropertyValue` and put it into storage folder. Then I fixed all the places where `TypedValue` was incorrectly used instead of `PropertyValue`. I also disabled implicit `TypedValue` to `PropertyValue` conversion in hopes of preventing misuse in the future. After that, I wrote code for `VertexAccessor` and `EdgeAccessor` serialization and put it into `storage` folder because it was almost duplicated in distributed BFS and pull produce RPC messages. On the sender side, some subset of records (old or new or both) is serialized, and on the reciever side, records are deserialized and immediately put into transaction cache. Then I rewrote the `TypedValue` serialization functions (`SaveCapnpTypedValue` and `LoadCapnpTypedValue`) to not take callbacks for `VERTEX`, `EDGE` and `PATH`, but use accessor serialization functions instead. That means that any code that wants to use `TypedValue` serialization must hold a reference to `GraphDbAccessor` and `DataManager`, so that should make clients reconsider if they really want to use `TypedValue` instead of `PropertyValue`. Reviewers: teon.banek, msantl Reviewed By: teon.banek Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1598
2018-09-13 18:12:07 +08:00
std::map<std::string, PropertyValue> params = {};
ResultStreamFaker<query::TypedValue> stream;
auto results = interpreter_.value()(query, dba, params, false);
stream.Header(results.header());
results.PullAll(stream);
stream.Summary(results.summary());
return stream.GetResults();
}
auto Run(const std::string &query) {
auto dba = master().Access();
auto results = RunWithDba(query, *dba);
dba->Commit();
return results;
}
private:
std::optional<query::DistributedInterpreter> interpreter_;
};
TEST_F(DistributedInterpretationTest, PullTest) {
auto results = Run("OPTIONAL MATCH(n) UNWIND(RANGE(0, 20)) AS X RETURN 1");
ASSERT_EQ(results.size(), 1 * 21);
for (auto result : results) {
ASSERT_EQ(result.size(), 1U);
ASSERT_EQ(result[0].ValueInt(), 1);
}
}
TEST_F(DistributedInterpretationTest, PullNoResultsTest) {
auto results = Run("MATCH (n) RETURN n");
ASSERT_EQ(results.size(), 0U);
}
TEST_F(DistributedInterpretationTest, CreateExpand) {
InsertVertex(master());
InsertVertex(worker(1));
InsertVertex(worker(1));
InsertVertex(worker(2));
InsertVertex(worker(2));
InsertVertex(worker(2));
Run("MATCH (n) CREATE (n)-[:T]->(m) RETURN n");
EXPECT_EQ(VertexCount(master()), 2);
EXPECT_EQ(VertexCount(worker(1)), 4);
EXPECT_EQ(VertexCount(worker(2)), 6);
}
TEST_F(DistributedInterpretationTest, RemoteExpandTest2) {
// Make a fully connected graph with vertices scattered across master and
// worker storage.
// Vertex count is low, because test gets exponentially slower. The expected
// result size is ~ vertices^3, and then that is compared at the end in no
// particular order which causes O(result_size^2) comparisons.
int verts_per_storage = 3;
std::vector<storage::VertexAddress> vertices;
vertices.reserve(verts_per_storage * 3);
auto add_vertices = [this, &vertices, &verts_per_storage](auto &db) {
for (int i = 0; i < verts_per_storage; ++i)
vertices.push_back(InsertVertex(db));
};
add_vertices(master());
add_vertices(worker(1));
add_vertices(worker(2));
auto get_edge_type = [](int v1, int v2) {
return std::to_string(v1) + "-" + std::to_string(v2);
};
std::vector<std::string> edge_types;
edge_types.reserve(vertices.size() * vertices.size());
for (size_t i = 0; i < vertices.size(); ++i) {
for (size_t j = 0; j < vertices.size(); ++j) {
auto edge_type = get_edge_type(i, j);
edge_types.push_back(edge_type);
InsertEdge(vertices[i], vertices[j], edge_type);
}
}
auto results = Run("MATCH (n)-[r1]-(m)-[r2]-(l) RETURN type(r1), type(r2)");
// We expect the number of results to be:
size_t expected_result_size =
// pick (n)
vertices.size() *
// pick both directed edges to other (m) and a
// single edge to (m) which equals (n), hence -1
(2 * vertices.size() - 1) *
// Pick as before, but exclude the previously taken edge, hence another -1
(2 * vertices.size() - 1 - 1);
std::vector<std::vector<std::string>> expected;
expected.reserve(expected_result_size);
for (size_t n = 0; n < vertices.size(); ++n) {
for (size_t m = 0; m < vertices.size(); ++m) {
std::vector<std::string> r1s{get_edge_type(n, m)};
if (n != m) r1s.push_back(get_edge_type(m, n));
for (size_t l = 0; l < vertices.size(); ++l) {
std::vector<std::string> r2s{get_edge_type(m, l)};
if (m != l) r2s.push_back(get_edge_type(l, m));
for (const auto &r1 : r1s) {
for (const auto &r2 : r2s) {
if (r1 == r2) continue;
expected.push_back({r1, r2});
}
}
}
}
}
ASSERT_EQ(expected.size(), expected_result_size);
ASSERT_EQ(results.size(), expected_result_size);
std::vector<std::vector<std::string>> got;
got.reserve(results.size());
for (const auto &res : results) {
std::vector<std::string> row;
row.reserve(res.size());
for (const auto &col : res) {
row.push_back(col.ValueString());
}
got.push_back(row);
}
ASSERT_THAT(got, testing::UnorderedElementsAreArray(expected));
}
TEST_F(DistributedInterpretationTest, Cartesian) {
// Create some data on the master and both workers.
storage::Property prop;
{
auto dba = master().Access();
auto tx_id = dba->transaction_id();
auto dba1 = worker(1).Access(tx_id);
auto dba2 = worker(2).Access(tx_id);
prop = dba->Property("prop");
auto add_data = [prop](GraphDbAccessor &dba, int value) {
dba.InsertVertex().PropsSet(prop, value);
};
for (int i = 0; i < 10; ++i) add_data(*dba, i);
for (int i = 10; i < 20; ++i) add_data(*dba1, i);
for (int i = 20; i < 30; ++i) add_data(*dba2, i);
dba->Commit();
}
std::vector<std::vector<int64_t>> expected;
for (int64_t i = 0; i < 30; ++i)
for (int64_t j = 0; j < 30; ++j) expected.push_back({i, j});
auto results = Run("MATCH (n), (m) RETURN n.prop, m.prop;");
size_t expected_result_size = 30 * 30;
ASSERT_EQ(expected.size(), expected_result_size);
ASSERT_EQ(results.size(), expected_result_size);
std::vector<std::vector<int64_t>> got;
got.reserve(results.size());
for (const auto &res : results) {
std::vector<int64_t> row;
row.reserve(res.size());
for (const auto &col : res) {
row.push_back(col.Value<int64_t>());
}
got.push_back(row);
}
ASSERT_THAT(got, testing::UnorderedElementsAreArray(expected));
}
class TestQueryWaitsOnFutures : public DistributedInterpretationTest {
protected:
int QueryExecutionTimeSec(int worker_id) override {
return worker_id == 2 ? 3 : 1;
}
};
TEST_F(TestQueryWaitsOnFutures, Test) {
const int kVertexCount = 10;
auto make_fully_connected = [](database::GraphDb &db) {
auto dba = db.Access();
std::vector<VertexAccessor> vertices;
for (int i = 0; i < kVertexCount; ++i)
vertices.emplace_back(dba->InsertVertex());
auto et = dba->EdgeType("et");
for (auto &from : vertices)
for (auto &to : vertices) dba->InsertEdge(from, to, et);
dba->Commit();
};
make_fully_connected(worker(1));
ASSERT_EQ(VertexCount(worker(1)), kVertexCount);
ASSERT_EQ(EdgeCount(worker(1)), kVertexCount * kVertexCount);
{
utils::Timer timer;
try {
Run("MATCH ()--()--()--()--()--()--() RETURN count(1)");
} catch (...) {
}
double seconds = timer.Elapsed().count();
EXPECT_GT(seconds, 1);
EXPECT_LT(seconds, 2);
}
make_fully_connected(worker(2));
ASSERT_EQ(VertexCount(worker(2)), kVertexCount);
ASSERT_EQ(EdgeCount(worker(2)), kVertexCount * kVertexCount);
{
utils::Timer timer;
try {
Run("MATCH ()--()--()--()--()--()--() RETURN count(1)");
} catch (...) {
}
double seconds = timer.Elapsed().count();
EXPECT_GT(seconds, 3);
}
}
TEST_F(DistributedInterpretationTest, PlanExpiration) {
FLAGS_query_plan_cache_ttl = 1;
Run("MATCH (n) RETURN n");
auto ids1 = worker(1).plan_consumer().CachedPlanIds();
ASSERT_EQ(ids1.size(), 1);
// Sleep so the cached plan becomes invalid.
std::this_thread::sleep_for(std::chrono::milliseconds(1100));
Run("MATCH (n) RETURN n");
// Sleep so the invalidated plan (removed from cache which is a concurrent
// map) gets destructed and thus remote caches cleared.
std::this_thread::sleep_for(std::chrono::milliseconds(1500));
auto ids2 = worker(1).plan_consumer().CachedPlanIds();
ASSERT_EQ(ids2.size(), 1);
EXPECT_NE(ids1, ids2);
}
TEST_F(DistributedInterpretationTest, ConcurrentPlanExpiration) {
FLAGS_query_plan_cache_ttl = 1;
auto count_vertices = [this]() {
utils::Timer timer;
while (timer.Elapsed() < 3s) {
Run("MATCH () RETURN count(1)");
}
};
std::vector<std::thread> counters;
for (size_t i = 0; i < std::thread::hardware_concurrency(); ++i)
counters.emplace_back(count_vertices);
for (auto &t : counters) t.join();
}
TEST_F(DistributedInterpretationTest, OngoingProduceKeyTest) {
int worker_count = 10;
for (int i = 0; i < worker_count; ++i) {
InsertVertex(master());
InsertVertex(worker(1));
InsertVertex(worker(2));
}
auto dba = master().Access();
auto count1 = RunWithDba("MATCH (n) RETURN count(n)", *dba);
dba->AdvanceCommand();
auto count2 = RunWithDba("MATCH (n) RETURN count(n)", *dba);
ASSERT_EQ(count1[0][0].ValueInt(), 3 * worker_count);
ASSERT_EQ(count2[0][0].ValueInt(), 3 * worker_count);
}
TEST_F(DistributedInterpretationTest, AdvanceCommandOnWorkers) {
auto dba = master().Access();
RunWithDba("UNWIND RANGE(1, 10) as x CREATE (:A {id: x})", *dba);
dba->AdvanceCommand();
// Advance commands on workers also.
Split GraphDb to distributed and single node files Summary: This change, hopefully, simplifies the implementation of different kinds of GraphDb. The pimpl idiom is now simplified by removing all of the crazy inheritance. Implementations classes are just plain data stores, without any methods. The interface classes now have a more flat hierarchy: ``` GraphDb (pure interface) | +----+---------- DistributedGraphDb (pure interface) | | Single Node +-----+------+ | | Master Worker ``` DistributedGraphDb is used as an intermediate interface for all the things that should work only in distributed. Therefore, virtual calls for distributed stuff have been removed from GraphDb. Some are exposed via DistributedGraphDb, other's are only in concrete Master and Worker classes. The code which relied on those virtual calls has been refactored to either use DistributedGraphDb, take a pointer to what is actually needed or use dynamic_cast. Obviously, dynamic_cast is a temporary solution and should be replaced with another mechanism (e.g. virtual call, or some other function pointer style). The cost of the above change is some code duplication in constructors and destructors of classes. This duplication has a lot of little tweaks that make it hard to generalize, not to mention that virtual calls do not work in constructor and destructor. If we really care about generalizing this, we should think about abandoning RAII in favor of constructor + Init method. The next steps for splitting the dependencies that seem logical are: 1) Split GraphDbAccessor implementation, either via inheritance or passing in an implementation pointer. GraphDbAccessor should then only be created by a virtual call on GraphDb. 2) Split Interpreter implementation. Besides allowing single node interpreter to exist without depending on distributed, this will enable the planner and operators to be correctly separated. Reviewers: msantl, mferencevic, ipaljak Reviewed By: msantl Subscribers: dgleich, pullbot Differential Revision: https://phabricator.memgraph.io/D1493
2018-07-19 23:00:50 +08:00
auto futures = master().pull_clients().NotifyAllTransactionCommandAdvanced(
dba->transaction_id());
for (auto &future : futures) future.wait();
auto count = RunWithDba("MATCH (n) RETURN count(n)", *dba);
ASSERT_EQ(count[0][0].ValueInt(), 10);
}
int main(int argc, char **argv) {
google::InitGoogleLogging(argv[0]);
::testing::InitGoogleTest(&argc, argv);
gflags::ParseCommandLineFlags(&argc, &argv, true);
FLAGS_skiplist_gc_interval = 1;
return RUN_ALL_TESTS();
}