Add index support in HA

Summary:
Added index creation and deletion handling in StateDelta.
Also included an integration test that creates an index and makes sure that it
gets replicated by killing each peer eventually causing a leader re-election.

Reviewers: ipaljak

Reviewed By: ipaljak

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1886
This commit is contained in:
Matija Santl 2019-02-25 14:55:26 +01:00
parent f685c08b7b
commit 4790d6458e
9 changed files with 215 additions and 9 deletions

View File

@ -20,10 +20,6 @@ void StateDeltaApplier::Apply(const std::vector<StateDelta> &deltas) {
LOG(FATAL) << "StateDeltaApplier shouldn't know about aborted "
"transactions";
break;
case StateDelta::Type::BUILD_INDEX:
case StateDelta::Type::DROP_INDEX:
throw utils::NotYetImplemented(
"High availability doesn't support index at the moment!");
default:
delta.Apply(*GetAccessor(delta.transaction_id));
}

View File

@ -336,9 +336,13 @@ void StateDelta::Apply(GraphDbAccessor &dba) const {
dba.RemoveEdge(edge);
break;
}
case Type::BUILD_INDEX:
case Type::BUILD_INDEX: {
dba.BuildIndex(dba.Label(label_name), dba.Property(property_name),
unique);
break;
}
case Type::DROP_INDEX: {
LOG(FATAL) << "Index handling not handled in Apply";
dba.DeleteIndex(dba.Label(label_name), dba.Property(property_name));
break;
}
case Type::NO_OP:

View File

@ -130,7 +130,7 @@ void RaftServer::Start() {
// [Raft paper 5.1]
// "If a server receives a request with a stale term, it rejects the
// request"
if (exiting_ || req.term < current_term_) {
if (req.term < current_term_) {
AppendEntriesRes res(false, current_term_);
Save(res, res_builder);
return;

View File

@ -19,5 +19,8 @@ add_subdirectory(distributed)
# distributed ha/basic binaries
add_subdirectory(ha/basic)
# distributed ha/index binaries
add_subdirectory(ha/index)
# audit test binaries
add_subdirectory(audit)

View File

@ -66,9 +66,19 @@
- runner.py # runner script
- raft.json # raft configuration
- ../ha_test.py # raft test base module
- ../../../../build_debug/memgraph_ha # memgraph distributed binary
- ../../../../build_debug/memgraph_ha # memgraph ha binary
- ../../../../build_debug/tests/integration/ha/basic/tester # tester binary
- name: integration__ha_index
cd: ha/index
commands: ./runner.py
infiles:
- runner.py # runner script
- raft.json # raft configuration
- ../ha_test.py # raft test base module
- ../../../../build_debug/memgraph_ha # memgraph ha binary
- ../../../../build_debug/tests/integration/ha/index/tester # tester binary
- name: integration__ha_log_compaction
cd: ha/log_compaction
commands: ./runner.py
@ -76,5 +86,5 @@
- runner.py # runner script
- raft.json # raft configuration
- ../ha_test.py # raft test base module
- ../../../../build_debug/memgraph_ha # memgraph distributed binary
- ../../../../build_debug/memgraph_ha # memgraph ha binary
- ../../../../build_debug/tests/manual/ha_client # tester binary

View File

@ -0,0 +1,6 @@
set(target_name memgraph__integration__ha_index)
set(tester_target_name ${target_name}__tester)
add_executable(${tester_target_name} tester.cpp)
set_target_properties(${tester_target_name} PROPERTIES OUTPUT_NAME tester)
target_link_libraries(${tester_target_name} mg-utils mg-communication)

View File

@ -0,0 +1,6 @@
{
"election_timeout_min": 200,
"election_timeout_max": 500,
"heartbeat_interval": 100,
"log_size_snapshot_threshold": -1
}

View File

@ -0,0 +1,93 @@
#!/usr/bin/python3
import argparse
import os
import time
import random
import subprocess
import sys
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", "..", ".."))
# append parent directory
sys.path.append(os.path.join(SCRIPT_DIR, ".."))
from ha_test import HaTestBase
class HaIndexTest(HaTestBase):
def execute_step(self, step):
if step == "create":
print("Executing create query")
client = subprocess.Popen([self.tester_binary, "--step", "create",
"--cluster_size", str(self.cluster_size)])
elif step == "check":
print("Executing check query")
client = subprocess.Popen([self.tester_binary, "--step", "check",
"--cluster_size", str(self.cluster_size)])
else:
return 0
# Check what happened with query execution.
try:
code = client.wait(timeout=30)
except subprocess.TimeoutExpired as e:
print("HA client timed out!")
client.kill()
return 1
return code
def execute(self):
self.start_cluster()
assert self.execute_step("create") == 0, \
"Error while executing create query"
for i in range(self.cluster_size):
# Kill worker.
print("Killing worker {}".format(i))
self.kill_worker(i)
time.sleep(5) # allow some time for possible leader re-election
assert self.execute_step("check") == 0, \
"Error while executing check query"
# Bring worker back to life.
print("Starting worker {}".format(i))
self.start_worker(i)
time.sleep(5) # allow some time for possible leader re-election
def find_correct_path(path):
f = os.path.join(PROJECT_DIR, "build", path)
if not os.path.exists(f):
f = os.path.join(PROJECT_DIR, "build_debug", path)
return f
if __name__ == "__main__":
memgraph_binary = find_correct_path("memgraph_ha")
tester_binary = find_correct_path(os.path.join("tests", "integration", "ha",
"index", "tester"))
raft_config_file = os.path.join(PROJECT_DIR, "tests", "integration", "ha",
"index", "raft.json")
parser = argparse.ArgumentParser()
parser.add_argument("--memgraph", default=memgraph_binary)
parser.add_argument("--raft_config_file", default=raft_config_file)
args = parser.parse_args()
for cluster_size in [3, 5]:
print("\033[1;36m~~ Executing test with cluster size: %d~~\033[0m" % (cluster_size))
HaIndexTest(
args.memgraph, tester_binary, args.raft_config_file, cluster_size)
print("\033[1;32m~~ The test finished successfully ~~\033[0m")
sys.exit(0)

View File

@ -0,0 +1,88 @@
#include <chrono>
#include <thread>
#include <vector>
#include <fmt/format.h>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "communication/bolt/client.hpp"
#include "io/network/endpoint.hpp"
#include "io/network/utils.hpp"
#include "utils/timer.hpp"
DEFINE_string(address, "127.0.0.1", "Server address");
DEFINE_int32(port, 7687, "Server port");
DEFINE_int32(cluster_size, 3, "Size of the raft cluster.");
DEFINE_string(username, "", "Username for the database");
DEFINE_string(password, "", "Password for the database");
DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server.");
DEFINE_string(step, "", "The step to execute (available: create, check)");
using namespace std::chrono_literals;
using communication::bolt::Value;
int main(int argc, char **argv) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
google::InitGoogleLogging(argv[0]);
const std::string index = ":Node(id)";
communication::Init();
bool successful = false;
for (int retry = 0; !successful && retry < 10; ++retry) {
for (int i = 0; !successful && i < FLAGS_cluster_size; ++i) {
try {
communication::ClientContext context(FLAGS_use_ssl);
communication::bolt::Client client(&context);
uint16_t port = FLAGS_port + i;
io::network::Endpoint endpoint{FLAGS_address, port};
client.Connect(endpoint, FLAGS_username, FLAGS_password);
if (FLAGS_step == "create") {
client.Execute(fmt::format("create index on {}", index), {});
successful = true;
} else if (FLAGS_step == "check") {
auto result = client.Execute("show index info", {});
auto checker = [&index](const std::vector<Value> &record) {
if (record.size() != 1) return false;
return record[0].ValueString() == index;
};
// Check that index ":Node(id)" exists
if (!std::any_of(result.records.begin(), result.records.end(),
checker)) {
LOG(WARNING) << "Missing index!";
return 2;
}
successful = true;
} else {
LOG(FATAL) << "Unexpected client step!";
}
} catch (const communication::bolt::ClientQueryException &) {
// This one is not the leader, continue.
continue;
} catch (const communication::bolt::ClientFatalException &) {
// This one seems to be down, continue.
continue;
}
LOG(INFO) << "Current Raft cluster leader is " << i;
}
if (!successful) {
LOG(INFO) << "Couldn't find Raft cluster leader, retrying.";
std::this_thread::sleep_for(1s);
}
}
if (!successful) {
LOG(WARNING) << "Couldn't find Raft cluster leader.";
return 1;
}
return 0;
}