Fix Raft's ReplicationLog

Summary:
`ReplicationLog` had a classic off-by-one bug. The `valid_prefix`
variable wasn't set properly.

This diff also includes a poor man's version of a HA client. This client
assumes that all the HA instances run on a single machine and that the
corresponding Bold endpoints have open ports ranging from `7687` to
`7687 + num_machines - 1`.

This should make it easeir to test certain things, ie. disk usage, P25.

This test revealed the bug with `ReplicationLog`

Reviewers: ipaljak

Reviewed By: ipaljak

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1813
This commit is contained in:
Matija Santl 2019-01-16 17:06:08 +01:00
parent 743d82b78d
commit 4fa44c3edd
4 changed files with 115 additions and 3 deletions

View File

@ -38,8 +38,10 @@ class ReplicationLog final {
// Clears the replication log from bits associated with transactions with an
// id lower than `id`.
void garbage_collect_older(tx::TransactionId id) {
// We keep track of the valid prefix in order to avoid the `CHECK` inside
// the `DynamicBitset`.
valid_prefix = 2 * id;
log.delete_prefix(2 * id);
valid_prefix = 2 * (id + 1);
}
class Info final {

View File

@ -54,6 +54,9 @@ target_link_libraries(${test_prefix}generate_snapshot mg-distributed kvstore_dum
add_manual_test(graph_500_generate_snapshot.cpp)
target_link_libraries(${test_prefix}graph_500_generate_snapshot mg-distributed kvstore_dummy_lib)
add_manual_test(ha_client.cpp)
target_link_libraries(${test_prefix}ha_client mg-utils mg-communication)
add_manual_test(kvstore_console.cpp)
target_link_libraries(${test_prefix}kvstore_console kvstore_lib gflags glog)

106
tests/manual/ha_client.cpp Normal file
View File

@ -0,0 +1,106 @@
#include <chrono>
#include <iostream>
#include <thread>
#include <vector>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "communication/bolt/client.hpp"
#include "io/network/endpoint.hpp"
#include "io/network/utils.hpp"
#include "utils/timer.hpp"
DEFINE_string(address, "127.0.0.1", "Server address");
DEFINE_int32(port, 7687, "Server port");
DEFINE_int32(cluster_size, 3, "Size of the raft cluster.");
DEFINE_string(username, "", "Username for the database");
DEFINE_string(password, "", "Password for the database");
DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server.");
using namespace std::chrono_literals;
void Execute(const std::vector<std::string> &queries) {
communication::ClientContext context(FLAGS_use_ssl);
std::experimental::optional<communication::bolt::Client> client;
communication::bolt::QueryData result;
for (size_t k = 0; k < queries.size();) {
if (!client) {
// Find the leader by executing query on all machines until one responds
// with success.
for (int retry = 0; !client && retry < 10; ++retry) {
for (int i = 0; !client && i < FLAGS_cluster_size; ++i) {
try {
client.emplace(&context);
uint16_t port = FLAGS_port + i;
io::network::Endpoint endpoint{FLAGS_address, port};
client->Connect(endpoint, FLAGS_username, FLAGS_password);
result = client->Execute(queries[k], {});
} catch (const communication::bolt::ClientQueryException &) {
// This one is not the leader, continue.
client = std::experimental::nullopt;
continue;
} catch (const communication::bolt::ClientFatalException &) {
// This one seems to be down, continue.
client = std::experimental::nullopt;
continue;
}
}
if (!client) {
LOG(INFO) << "Couldn't find Raft cluster leader, retrying...";
std::this_thread::sleep_for(1s);
}
}
if (!client) {
LOG(ERROR) << "Couldn't find Raft cluster leader.";
return;
}
} else {
// Try reusing the previous client.
try {
result = client->Execute(queries[k], {});
} catch (const communication::bolt::ClientQueryException &) {
client = std::experimental::nullopt;
continue;
} catch (const communication::bolt::ClientFatalException &e) {
client = std::experimental::nullopt;
continue;
}
}
if (result.records.size() > 0) {
std::cout << "Results: " << std::endl;
for (auto &record : result.records) {
std::cout << record << std::endl;
}
}
k += 1;
}
return;
}
int main(int argc, char **argv) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
google::InitGoogleLogging(argv[0]);
communication::Init();
std::vector<std::string> queries;
std::string line;
while (std::getline(std::cin, line)) {
queries.emplace_back(std::move(line));
}
Execute(queries);
return 0;
}

View File

@ -49,17 +49,18 @@ TEST(ReplicationLog, GarbageCollect) {
rlog.garbage_collect_older(n);
for (int i = 1; i <= n; ++i) {
for (int i = 1; i < n; ++i) {
EXPECT_FALSE(rlog.is_active(i));
EXPECT_FALSE(rlog.is_replicated(i));
}
for (int i = n + 1; i < 2 * n; ++i) {
for (int i = n; i < 2 * n; ++i) {
EXPECT_FALSE(rlog.is_active(i));
EXPECT_TRUE(rlog.is_replicated(i));
}
for (int i = 2 * n; i < 3 * n; ++i) {
EXPECT_TRUE(rlog.is_active(i));
EXPECT_FALSE(rlog.is_replicated(i));
}
}