diff --git a/.github/workflows/diff.yaml b/.github/workflows/diff.yaml index 7ba4747b0..a8e8aa502 100644 --- a/.github/workflows/diff.yaml +++ b/.github/workflows/diff.yaml @@ -289,6 +289,15 @@ jobs: tests/gql_behave/gql_behave_status.csv tests/gql_behave/gql_behave_status.html + - name: Run e2e replication tests + run: | + # TODO(gitbuda): Setup mgclient and pymgclient properly. + cd tests + ./setup.sh + source ve3/bin/activate + cd e2e + LD_LIBRARY_PATH=$LD_LIBRARY_PATH:../../libs/mgclient/lib python runner.py --workloads-path replication/workloads.yaml + - name: Run stress test (plain) run: | cd tests/stress diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index 57fc0e452..e5dce6a8e 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -281,6 +281,15 @@ jobs: tests/gql_behave/gql_behave_status.csv tests/gql_behave/gql_behave_status.html + - name: Run e2e replication tests + run: | + # TODO(gitbuda): Setup mgclient and pymgclient properly. + cd tests + ./setup.sh + source ve3/bin/activate + cd e2e + LD_LIBRARY_PATH=$LD_LIBRARY_PATH:../../libs/mgclient/lib python runner.py --workloads-path replication/workloads.yaml + - name: Run stress test (plain) run: | cd tests/stress diff --git a/.github/workflows/release_centos.yaml b/.github/workflows/release_centos.yaml index 050b00120..93dd8eaeb 100644 --- a/.github/workflows/release_centos.yaml +++ b/.github/workflows/release_centos.yaml @@ -280,6 +280,15 @@ jobs: tests/gql_behave/gql_behave_status.csv tests/gql_behave/gql_behave_status.html + - name: Run e2e replication tests + run: | + # TODO(gitbuda): Setup mgclient and pymgclient properly. + cd tests + ./setup.sh + source ve3/bin/activate + cd e2e + LD_LIBRARY_PATH=$LD_LIBRARY_PATH:../../libs/mgclient/lib python runner.py --workloads-path replication/workloads.yaml + - name: Run stress test (plain) run: | cd tests/stress diff --git a/init b/init index 3119a5fc4..d5510a7e6 100755 --- a/init +++ b/init @@ -80,7 +80,7 @@ echo \ (ql:quickload '(:lcp :lcp/test) :silent t) " | sbcl --script -# setup libs (download) +# Setup libs (download). cd libs ./cleanup.sh ./setup.sh @@ -95,6 +95,14 @@ setup_virtualenv tests/stress # setup integration/ldap dependencies setup_virtualenv tests/integration/ldap +# Setup tests dependencies. +# cd tests +# ./setup.sh +# cd .. +# TODO(gitbuda): Remove setup_virtualenv, replace it with tests/ve3. Take care +# of the build order because tests/setup.py builds pymgclient which depends on +# mgclient which is build after this script by calling make. + echo "Done installing dependencies for Memgraph" echo "Linking git hooks" diff --git a/libs/setup.sh b/libs/setup.sh index adaccedd1..205d232c9 100755 --- a/libs/setup.sh +++ b/libs/setup.sh @@ -117,6 +117,10 @@ clone https://github.com/facebook/rocksdb.git rocksdb $rocksdb_tag sed -i 's/TARGETS ${ROCKSDB_SHARED_LIB}/TARGETS ${ROCKSDB_SHARED_LIB} OPTIONAL/' rocksdb/CMakeLists.txt # mgclient -mgclient_tag="fe94b3631385ef5dbe40a3d8458860dbcc33e6ea" # May 27, 2019 +mgclient_tag="v1.2.0" # (2021-01-14) clone https://github.com/memgraph/mgclient.git mgclient $mgclient_tag sed -i 's/\${CMAKE_INSTALL_LIBDIR}/lib/' mgclient/src/CMakeLists.txt + +# pymgclient +pymgclient_tag="4f85c179e56302d46a1e3e2cf43509db65f062b3" # (2021-01-15) +clone https://github.com/memgraph/pymgclient.git pymgclient $pymgclient_tag diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index cfd79d404..02535dcc7 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -22,8 +22,8 @@ add_subdirectory(property_based) # integration test binaries add_subdirectory(integration) -# feature benchmark test binaries -add_subdirectory(feature_benchmark) +# e2e test binaries +add_subdirectory(e2e) # mgbench benchmark test binaries add_subdirectory(mgbench) diff --git a/tests/e2e/.gitignore b/tests/e2e/.gitignore new file mode 100644 index 000000000..a6c57f5fb --- /dev/null +++ b/tests/e2e/.gitignore @@ -0,0 +1 @@ +*.json diff --git a/tests/e2e/CMakeLists.txt b/tests/e2e/CMakeLists.txt new file mode 100644 index 000000000..29749fd19 --- /dev/null +++ b/tests/e2e/CMakeLists.txt @@ -0,0 +1 @@ +add_subdirectory(replication) diff --git a/tests/e2e/memgraph.py b/tests/e2e/memgraph.py new file mode 100755 index 000000000..99bf32f22 --- /dev/null +++ b/tests/e2e/memgraph.py @@ -0,0 +1,89 @@ +import copy +import os +import subprocess +import sys +import tempfile +import time + +import mgclient + +SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) +PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..")) +BUILD_DIR = os.path.join(PROJECT_DIR, "build") +MEMGRAPH_BINARY = os.path.join(BUILD_DIR, "memgraph") + + +def wait_for_server(port, delay=0.01): + cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)] + count = 0 + while subprocess.call(cmd) != 0: + time.sleep(0.01) + if count > 10 / 0.01: + print("Could not wait for server on port", port, "to startup!") + sys.exit(1) + count += 1 + time.sleep(delay) + + +def extract_bolt_port(args): + for arg_index, arg in enumerate(args): + if arg.startswith('--bolt-port='): + maybe_port = arg.split('=')[1] + if not maybe_port.isdigit(): + raise Exception('Unable to read Bolt port after --bolt-port=.') + return int(maybe_port) + elif arg == '--bolt-port': + maybe_port = args[arg_index + 1] + if not maybe_port.isdigit(): + raise Exception('Unable to read Bolt port after --bolt-port.') + return int(maybe_port) + return 7687 + + +class MemgraphInstanceRunner(): + def __init__(self, binary_path=MEMGRAPH_BINARY, args=[]): + self.host = '127.0.0.1' + self.bolt_port = extract_bolt_port(args) + self.binary_path = binary_path + self.args = args + self.proc_mg = None + self.conn = None + + def query(self, query): + cursor = self.conn.cursor() + cursor.execute(query) + return cursor.fetchall() + + def start(self, restart=False, args=[]): + if not restart and self.is_running(): + return + self.stop() + self.args = copy.deepcopy(args) + self.data_directory = tempfile.TemporaryDirectory() + args_mg = [self.binary_path, + "--data-directory", self.data_directory.name, + "--storage-wal-enabled", + "--storage-snapshot-interval-sec", "300", + "--storage-properties-on-edges"] + self.args + self.bolt_port = extract_bolt_port(args_mg) + self.proc_mg = subprocess.Popen(args_mg) + wait_for_server(self.bolt_port) + self.conn = mgclient.connect( + host=self.host, + port=self.bolt_port) + self.conn.autocommit = True + assert self.is_running(), "The Memgraph process died!" + + def is_running(self): + if self.proc_mg is None: + return False + if self.proc_mg.poll() is not None: + return False + return True + + def stop(self): + if not self.is_running(): + return + self.proc_mg.terminate() + code = self.proc_mg.wait() + assert code == 0, "The Memgraph process exited with non-zero!" diff --git a/tests/e2e/replication/CMakeLists.txt b/tests/e2e/replication/CMakeLists.txt new file mode 100644 index 000000000..2cf5cb6d7 --- /dev/null +++ b/tests/e2e/replication/CMakeLists.txt @@ -0,0 +1,5 @@ +add_executable(memgraph__e2e__replication__constraints constraints.cpp) +target_link_libraries(memgraph__e2e__replication__constraints glog gflags mgclient mg-utils mg-io Threads::Threads) + +add_executable(memgraph__e2e__replication__read_write_benchmark read_write_benchmark.cpp) +target_link_libraries(memgraph__e2e__replication__read_write_benchmark glog gflags json mgclient mg-utils mg-io Threads::Threads) diff --git a/tests/e2e/replication/common.hpp b/tests/e2e/replication/common.hpp new file mode 100644 index 000000000..74266155c --- /dev/null +++ b/tests/e2e/replication/common.hpp @@ -0,0 +1,71 @@ +#include +#include + +#include +#include + +#include "io/network/endpoint.hpp" +#include "mgclient.hpp" +#include "utils/string.hpp" + +DEFINE_string(database_endpoints, + "127.0.0.1:7687,127.0.0.1:7688,127.0.0.1:7689", + "An array of database endspoints. Each endpoint is separated by " + "comma. Within each endpoint, colon separates host and port. Use " + "IPv4 addresses as hosts. First endpoint represents main " + "replication instance."); +DEFINE_string(username, "", "Database username."); +DEFINE_string(password, "", "Database password."); +DEFINE_bool(use_ssl, false, "Use SSL connection."); +DEFINE_int32(nodes, 1000, "Number of nodes in DB."); +DEFINE_int32(edges, 5000, "Number of edges in DB."); +DEFINE_double(reads_duration_limit, 10.0, + "How long should the client perform reads (seconds)"); + +namespace mg::e2e::replication { + +auto ParseDatabaseEndpoints(const std::string &database_endpoints_str) { + const auto db_endpoints_strs = utils::Split(database_endpoints_str, ","); + std::vector database_endpoints; + for (const auto &db_endpoint_str : db_endpoints_strs) { + const auto maybe_host_port = + io::network::Endpoint::ParseSocketOrIpAddress(db_endpoint_str, 7687); + CHECK(maybe_host_port); + database_endpoints.emplace_back( + io::network::Endpoint(maybe_host_port->first, maybe_host_port->second)); + } + return database_endpoints; +} + +auto Connect(const io::network::Endpoint &database_endpoint) { + mg::Client::Params params; + params.host = database_endpoint.address; + params.port = database_endpoint.port; + params.use_ssl = FLAGS_use_ssl; + auto client = mg::Client::Connect(params); + if (!client) { + LOG(FATAL) << "Failed to connect!"; + } + return client; +} + +class IntGenerator { + public: + IntGenerator(const std::string &purpose, int start, int end) + : seed_(std::chrono::high_resolution_clock::now() + .time_since_epoch() + .count()), + rng_(seed_), + dist_(start, end) { + LOG(INFO) << purpose << " int generator seed: " << seed_; + } + + int Next() { return dist_(rng_); } + + private: + uint64_t seed_; + std::mt19937 rng_; + std::uniform_int_distribution dist_; +}; + +} // namespace mg::e2e::replication diff --git a/tests/e2e/replication/constraints.cpp b/tests/e2e/replication/constraints.cpp new file mode 100644 index 000000000..6701b8f1a --- /dev/null +++ b/tests/e2e/replication/constraints.cpp @@ -0,0 +1,146 @@ +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "common.hpp" +#include "utils/thread.hpp" +#include "utils/timer.hpp" + +int main(int argc, char **argv) { + google::SetUsageMessage("Memgraph E2E Replication Read-write Benchmark"); + gflags::ParseCommandLineFlags(&argc, &argv, true); + google::InitGoogleLogging(argv[0]); + auto database_endpoints = + mg::e2e::replication::ParseDatabaseEndpoints(FLAGS_database_endpoints); + + mg::Client::Init(); + + { + auto client = mg::e2e::replication::Connect(database_endpoints[0]); + client->Execute("MATCH (n) DETACH DELETE n;"); + client->DiscardAll(); + client->Execute("CREATE INDEX ON :Node(id);"); + client->DiscardAll(); + client->Execute("CREATE CONSTRAINT ON (n:Node) ASSERT n.id IS UNIQUE;"); + client->DiscardAll(); + + // Sleep a bit so the constraints get replicated. + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + for (const auto &database_endpoint : database_endpoints) { + auto client = mg::e2e::replication::Connect(database_endpoint); + client->Execute("SHOW CONSTRAINT INFO;"); + if (const auto data = client->FetchAll()) { + const auto label_name = (*data)[0][1].ValueString(); + const auto property_name = (*data)[0][2].ValueList()[0].ValueString(); + if (label_name != "Node" || property_name != "id") { + LOG(FATAL) << database_endpoint + << " does NOT hava valid constraint created."; + } + } else { + LOG(FATAL) << "Unable to get CONSTRAINT INFO from " + << database_endpoint; + } + } + LOG(INFO) << "All constraints are in-place."; + + for (int i = 0; i < FLAGS_nodes; ++i) { + client->Execute("CREATE (:Node {id:" + std::to_string(i) + "});"); + client->DiscardAll(); + } + mg::e2e::replication::IntGenerator edge_generator("EdgeCreateGenerator", 0, + FLAGS_nodes - 1); + for (int i = 0; i < FLAGS_edges; ++i) { + client->Execute("MATCH (n {id:" + std::to_string(edge_generator.Next()) + + "}), (m {id:" + std::to_string(edge_generator.Next()) + + "}) CREATE (n)-[:Edge]->(m);"); + client->DiscardAll(); + } + } + + { + const int num_threads = std::thread::hardware_concurrency(); + std::vector threads; + std::vector thread_duration; + threads.reserve(num_threads); + thread_duration.resize(num_threads); + + for (int i = 0; i < num_threads; ++i) { + const auto &database_endpoint = + database_endpoints[i % database_endpoints.size()]; + threads.emplace_back([i, &database_endpoint, + cluster_size = database_endpoints.size(), + &local_duration = thread_duration[i]] { + auto client = mg::e2e::replication::Connect(database_endpoint); + mg::e2e::replication::IntGenerator node_update_generator( + fmt::format("NodeUpdateGenerator {}", i), 0, FLAGS_nodes - 1); + utils::Timer t; + + while (true) { + local_duration = t.Elapsed().count(); + if (local_duration >= FLAGS_reads_duration_limit) break; + // In the main case try to update. + if (i % cluster_size == 0) { + try { + client->Execute("MATCH (n:Node {id:" + + std::to_string(node_update_generator.Next()) + + "}) SET n.id = " + + std::to_string(node_update_generator.Next()) + + " RETURN n.id;"); + client->FetchAll(); + } catch (const std::exception &e) { + // Pass. + } + } else { // In the replica case fetch all unique ids. + try { + client->Execute("MATCH (n) RETURN n.id;"); + const auto data = client->FetchAll(); + std::unordered_set unique; + for (const auto &value : *data) { + unique.insert(value[0].ValueInt()); + } + if ((*data).size() != unique.size()) { + LOG(FATAL) << "Some ids are equal."; + } + } catch (const std::exception &e) { + LOG(FATAL) << e.what(); + break; + } + } + } + }); + } + + for (auto &t : threads) { + if (t.joinable()) t.join(); + } + } + + { + auto client = mg::e2e::replication::Connect(database_endpoints[0]); + client->Execute("DROP CONSTRAINT ON (n:Node) ASSERT n.id IS UNIQUE"); + client->DiscardAll(); + // Sleep a bit so the drop constraints get replicated. + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + for (const auto &database_endpoint : database_endpoints) { + auto client = mg::e2e::replication::Connect(database_endpoint); + client->Execute("SHOW CONSTRAINT INFO;"); + if (const auto data = client->FetchAll()) { + if ((*data).size() != 0) { + LOG(FATAL) << database_endpoint << " still have some constraints."; + } + } else { + LOG(FATAL) << "Unable to get CONSTRAINT INFO from " + << database_endpoint; + } + } + LOG(INFO) << "All constraints were deleted."; + } + + return 0; +} diff --git a/tests/e2e/replication/read_write_benchmark.cpp b/tests/e2e/replication/read_write_benchmark.cpp new file mode 100644 index 000000000..3dc6f58a4 --- /dev/null +++ b/tests/e2e/replication/read_write_benchmark.cpp @@ -0,0 +1,153 @@ +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "common.hpp" +#include "utils/thread.hpp" +#include "utils/timer.hpp" + +DEFINE_string(output_file, + "memgraph__e2e__replication__read_write_benchmark.json", + "Output file where the results should be in JSON format."); + +int main(int argc, char **argv) { + google::SetUsageMessage("Memgraph E2E Replication Read-write Benchmark"); + gflags::ParseCommandLineFlags(&argc, &argv, true); + google::InitGoogleLogging(argv[0]); + const auto database_endpoints = + mg::e2e::replication::ParseDatabaseEndpoints(FLAGS_database_endpoints); + nlohmann::json output; + output["nodes"] = FLAGS_nodes; + output["edges"] = FLAGS_edges; + + mg::Client::Init(); + + { + auto client = mg::e2e::replication::Connect(database_endpoints[0]); + client->Execute("MATCH (n) DETACH DELETE n;"); + client->DiscardAll(); + client->Execute("CREATE INDEX ON :Node(id);"); + client->DiscardAll(); + + // Sleep a bit so the index get replicated. + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + for (const auto &database_endpoint : database_endpoints) { + auto client = mg::e2e::replication::Connect(database_endpoint); + client->Execute("SHOW INDEX INFO;"); + if (auto data = client->FetchAll()) { + auto label_name = (*data)[0][1].ValueString(); + auto property_name = (*data)[0][2].ValueString(); + if (label_name != "Node" || property_name != "id") { + LOG(FATAL) << database_endpoint + << " does NOT have valid indexes created."; + } + } else { + LOG(FATAL) << "Unable to get INDEX INFO from " << database_endpoint; + } + } + LOG(INFO) << "All indexes are in-place."; + + utils::Timer node_write_timer; + for (int i = 0; i < FLAGS_nodes; ++i) { + client->Execute("CREATE (:Node {id:" + std::to_string(i) + "});"); + client->DiscardAll(); + } + output["node_write_time"] = node_write_timer.Elapsed().count(); + + mg::e2e::replication::IntGenerator edge_generator("EdgeCreateGenerator", 0, + FLAGS_nodes - 1); + utils::Timer edge_write_timer; + for (int i = 0; i < FLAGS_edges; ++i) { + client->Execute("MATCH (n {id:" + std::to_string(edge_generator.Next()) + + "}), (m {id:" + std::to_string(edge_generator.Next()) + + "}) CREATE (n)-[:Edge]->(m);"); + client->DiscardAll(); + } + output["edge_write_time"] = node_write_timer.Elapsed().count(); + } + + { // Benchmark read queries. + const int num_threads = std::thread::hardware_concurrency(); + std::atomic query_counter{0}; + std::vector threads; + std::vector thread_duration; + threads.reserve(num_threads); + thread_duration.resize(num_threads); + + for (int i = 0; i < num_threads; ++i) { + const auto &database_endpoint = + database_endpoints[i % database_endpoints.size()]; + threads.emplace_back([i, &database_endpoint, &query_counter, + &local_duration = thread_duration[i]] { + utils::ThreadSetName(fmt::format("BenchWriter{}", i)); + auto client = mg::e2e::replication::Connect(database_endpoint); + mg::e2e::replication::IntGenerator node_generator( + fmt::format("NodeReadGenerator {}", i), 0, FLAGS_nodes - 1); + utils::Timer t; + + while (true) { + local_duration = t.Elapsed().count(); + if (local_duration >= FLAGS_reads_duration_limit) break; + try { + client->Execute( + "MATCH (n {id:" + std::to_string(node_generator.Next()) + + "})-[e]->(m) RETURN e, m;"); + client->DiscardAll(); + query_counter.fetch_add(1); + } catch (const std::exception &e) { + LOG(FATAL) << e.what(); + break; + } + } + }); + } + + for (auto &t : threads) { + if (t.joinable()) t.join(); + } + + double all_duration = 0; + for (auto &d : thread_duration) all_duration += d; + output["total_read_duration"] = all_duration; + double per_thread_read_duration = all_duration / num_threads; + output["per_thread_read_duration"] = per_thread_read_duration; + output["read_per_second"] = query_counter / per_thread_read_duration; + output["read_queries"] = query_counter.load(); + + LOG(INFO) << "Output data: " << output.dump(); + std::ofstream output_file(FLAGS_output_file); + output_file << output.dump(); + output_file.close(); + } + + { + auto client = mg::e2e::replication::Connect(database_endpoints[0]); + client->Execute("DROP INDEX ON :Node(id);"); + client->DiscardAll(); + // Sleep a bit so the drop index get replicated. + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + for (const auto &database_endpoint : database_endpoints) { + auto client = mg::e2e::replication::Connect(database_endpoint); + client->Execute("SHOW INDEX INFO;"); + if (const auto data = client->FetchAll()) { + if ((*data).size() != 0) { + LOG(FATAL) << database_endpoint << " still have some indexes."; + } + } else { + LOG(FATAL) << "Unable to get INDEX INFO from " << database_endpoint; + } + } + LOG(INFO) << "All indexes were deleted."; + } + + mg::Client::Finalize(); + + return 0; +} diff --git a/tests/e2e/replication/workloads.yaml b/tests/e2e/replication/workloads.yaml new file mode 100644 index 000000000..5f5a0d65d --- /dev/null +++ b/tests/e2e/replication/workloads.yaml @@ -0,0 +1,45 @@ +template_test_nodes_query: &template_test_nodes_query + - query: "MATCH (n) RETURN count(n);" + expected: 1000 +template_test_edge_query: &template_test_edges_query + - query: "MATCH (n)-[r]->(m) RETURN count(r);" + expected: 5000 +template_validation_queries: &template_validation_queries + validation_queries: + - <<: *template_test_nodes_query + - <<: *template_test_edges_query +template_cluster: &template_cluster + cluster: + replica_1: + args: ["--bolt-port", "7688"] + setup_queries: ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"] + <<: *template_validation_queries + replica_2: + args: ["--bolt-port", "7689"] + setup_queries: ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"] + <<: *template_validation_queries + replica_3: + args: ["--bolt-port", "7690"] + setup_queries: ["SET REPLICATION ROLE TO REPLICA WITH PORT 10003;"] + <<: *template_validation_queries + main: + args: ["--bolt-port", "7687"] + setup_queries: [ + "REGISTER REPLICA replica_1 SYNC WITH TIMEOUT 0 TO '127.0.0.1:10001'", + "REGISTER REPLICA replica_2 SYNC WITH TIMEOUT 1 TO '127.0.0.1:10002'", + "REGISTER REPLICA replica_3 ASYNC TO '127.0.0.1:10003'" + ] + <<: *template_validation_queries + +workloads: + - name: "Constraints" + binary: "tests/e2e/replication/memgraph__e2e__replication__constraints" + args: [] + <<: *template_cluster + + - name: "Read-write benchmark" + binary: "tests/e2e/replication/memgraph__e2e__replication__read_write_benchmark" + args: [] + <<: *template_cluster + + diff --git a/tests/e2e/runner.py b/tests/e2e/runner.py new file mode 100755 index 000000000..d1d29ac23 --- /dev/null +++ b/tests/e2e/runner.py @@ -0,0 +1,71 @@ +from argparse import ArgumentParser +import atexit +import logging +import os +import subprocess +import yaml + +from memgraph import MemgraphInstanceRunner + +SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) +PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..")) +BUILD_DIR = os.path.join(PROJECT_DIR, "build") +MEMGRAPH_BINARY = os.path.join(BUILD_DIR, "memgraph") + +log = logging.getLogger("memgraph.tests.e2e.replication") + + +def load_args(): + parser = ArgumentParser() + parser.add_argument("--workloads-path", required=True) + parser.add_argument("--workload-name", default=None, required=False) + return parser.parse_args() + + +def load_workloads(path): + with open(path, "r") as f: + return yaml.load(f, Loader=yaml.FullLoader)['workloads'] + + +def run(args): + workloads = load_workloads(args.workloads_path) + for workload in workloads: + workload_name = workload['name'] + if args.workload_name is not None and \ + args.workload_name != workload_name: + continue + log.info("%s STARTED.", workload_name) + # Setup. + mg_instances = {} + @atexit.register + def cleanup(): + for mg_instance in mg_instances.values(): + mg_instance.stop() + for name, config in workload['cluster'].items(): + mg_instance = MemgraphInstanceRunner(MEMGRAPH_BINARY) + mg_instances[name] = mg_instance + mg_instance.start(args=config['args']) + for query in config['setup_queries']: + mg_instance.query(query) + # Test. + mg_test_binary = os.path.join(BUILD_DIR, workload['binary']) + subprocess.run( + [mg_test_binary] + workload['args'], + check=True, + stderr=subprocess.STDOUT) + # Validation. + for name, config in workload['cluster'].items(): + for validation in config['validation_queries']: + mg_instance = mg_instances[name] + data = mg_instance.query(validation['query'])[0][0] + assert data == validation['expected'] + cleanup() + log.info("%s PASSED.", workload_name) + + +if __name__ == '__main__': + args = load_args() + logging.basicConfig( + level=logging.INFO, + format='%(levelname)s %(asctime)s %(name)s] %(message)s') + run(args) diff --git a/tests/feature_benchmark/CMakeLists.txt b/tests/feature_benchmark/CMakeLists.txt deleted file mode 100644 index 89909e4e1..000000000 --- a/tests/feature_benchmark/CMakeLists.txt +++ /dev/null @@ -1,2 +0,0 @@ -## ha test binaries -#add_subdirectory(ha) diff --git a/tests/feature_benchmark/ha/CMakeLists.txt b/tests/feature_benchmark/ha/CMakeLists.txt deleted file mode 100644 index 8f235fe96..000000000 --- a/tests/feature_benchmark/ha/CMakeLists.txt +++ /dev/null @@ -1,3 +0,0 @@ -# test binaries -add_subdirectory(read) -add_subdirectory(write) diff --git a/tests/feature_benchmark/ha/read/CMakeLists.txt b/tests/feature_benchmark/ha/read/CMakeLists.txt deleted file mode 100644 index f1fac5062..000000000 --- a/tests/feature_benchmark/ha/read/CMakeLists.txt +++ /dev/null @@ -1,6 +0,0 @@ -set(target_name memgraph__feature_benchmark__ha__read) - -set(benchmark_target_name ${target_name}__benchmark) -add_executable(${benchmark_target_name} benchmark.cpp) -set_target_properties(${benchmark_target_name} PROPERTIES OUTPUT_NAME benchmark) -target_link_libraries(${benchmark_target_name} mg-utils mg-communication) diff --git a/tests/feature_benchmark/ha/read/benchmark.cpp b/tests/feature_benchmark/ha/read/benchmark.cpp deleted file mode 100644 index 877f2a33b..000000000 --- a/tests/feature_benchmark/ha/read/benchmark.cpp +++ /dev/null @@ -1,127 +0,0 @@ -#include -#include -#include -#include -#include -#include - -#include -#include - -#include "communication/bolt/ha_client.hpp" -#include "io/network/endpoint.hpp" -#include "io/network/utils.hpp" -#include "utils/flag_validation.hpp" -#include "utils/thread.hpp" -#include "utils/timer.hpp" - -using namespace std::literals::chrono_literals; - -DEFINE_string(address, "127.0.0.1", "Server address"); -DEFINE_int32(port, 7687, "Server port"); -DEFINE_int32(cluster_size, 3, "Size of the raft cluster."); -DEFINE_string(username, "", "Username for the database"); -DEFINE_string(password, "", "Password for the database"); -DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server."); -DEFINE_double(duration, 10.0, - "How long should the client perform reads (seconds)"); -DEFINE_string(output_file, "", "Output file where the results should be."); -DEFINE_int32(nodes, 1000, "Number of nodes in DB"); -DEFINE_int32(edges, 5000, "Number of edges in DB"); - -int main(int argc, char **argv) { - gflags::ParseCommandLineFlags(&argc, &argv, true); - google::SetUsageMessage("Memgraph HA read benchmark client"); - google::InitGoogleLogging(argv[0]); - - std::atomic query_counter{0}; - - std::vector endpoints; - for (int i = 0; i < FLAGS_cluster_size; ++i) { - uint16_t port = FLAGS_port + i; - io::network::Endpoint endpoint{FLAGS_address, port}; - endpoints.push_back(endpoint); - } - - // Populate the db (random graph with given number of nodes and edges) - // Raft reelection constants are between 300ms to 500ms, so we - // use 1000ms to avoid using up all retries unnecessarily during reelection. - std::chrono::milliseconds retry_delay(1000); - communication::ClientContext context(FLAGS_use_ssl); - communication::bolt::HAClient client(endpoints, &context, FLAGS_username, - FLAGS_password, 10, retry_delay); - - for (int i = 0; i < FLAGS_nodes; ++i) { - client.Execute("CREATE (:Node {id:" + std::to_string(i) + "})", {}); - } - - auto seed = - std::chrono::high_resolution_clock::now().time_since_epoch().count(); - std::mt19937 rng(seed); - std::uniform_int_distribution dist(0, FLAGS_nodes - 1); - - for (int i = 0; i < FLAGS_edges; ++i) { - int a = dist(rng), b = dist(rng); - client.Execute("MATCH (n {id:" + std::to_string(a) + "})," + - " (m {id:" + std::to_string(b) + "})" + - "CREATE (n)-[:Edge]->(m);", {}); - } - - const int num_threads = std::thread::hardware_concurrency(); - std::vector threads; - std::vector thread_duration; - threads.reserve(num_threads); - thread_duration.resize(num_threads); - - for (int i = 0; i < num_threads; ++i) { - threads.emplace_back([i, &endpoints, &query_counter, retry_delay, - &local_duration = thread_duration[i]]() { - utils::ThreadSetName(fmt::format("BenchWriter{}", i)); - communication::ClientContext context(FLAGS_use_ssl); - communication::bolt::HAClient client(endpoints, &context, FLAGS_username, - FLAGS_password, 10, retry_delay); - - auto seed = - std::chrono::high_resolution_clock::now().time_since_epoch().count(); - std::mt19937 rng(seed); - std::uniform_int_distribution dist(0, FLAGS_nodes - 1); - - utils::Timer t; - while (true) { - local_duration = t.Elapsed().count(); - if (local_duration >= FLAGS_duration) break; - int id = dist(rng); - - try { - client.Execute("MATCH (n {id:" + std::to_string(id) + - "})-[e]->(m) RETURN e, m;", {}); - query_counter.fetch_add(1); - } catch (const communication::bolt::ClientQueryException &e) { - LOG(WARNING) << e.what(); - break; - } catch (const communication::bolt::ClientFatalException &e) { - LOG(WARNING) << e.what(); - break; - } - } - }); - } - - for (auto &t : threads) { - if (t.joinable()) t.join(); - } - - double duration = 0; - for (auto &d : thread_duration) duration += d; - duration /= num_threads; - - double read_per_second = query_counter / duration; - - std::ofstream output(FLAGS_output_file); - output << "duration " << duration << std::endl; - output << "executed_reads " << query_counter << std::endl; - output << "read_per_second " << read_per_second << std::endl; - output.close(); - - return 0; -} diff --git a/tests/feature_benchmark/ha/read/coordination.json b/tests/feature_benchmark/ha/read/coordination.json deleted file mode 100644 index 35b8c65aa..000000000 --- a/tests/feature_benchmark/ha/read/coordination.json +++ /dev/null @@ -1,5 +0,0 @@ -[ - [1, "127.0.0.1", 10000], - [2, "127.0.0.1", 10001], - [3, "127.0.0.1", 10002] -] diff --git a/tests/feature_benchmark/ha/read/raft.json b/tests/feature_benchmark/ha/read/raft.json deleted file mode 100644 index 744fcf1f6..000000000 --- a/tests/feature_benchmark/ha/read/raft.json +++ /dev/null @@ -1,7 +0,0 @@ -{ - "election_timeout_min": 350, - "election_timeout_max": 700, - "heartbeat_interval": 100, - "replication_timeout": 10000, - "log_size_snapshot_threshold": -1 -} diff --git a/tests/feature_benchmark/ha/read/runner.sh b/tests/feature_benchmark/ha/read/runner.sh deleted file mode 100755 index cd55fc7db..000000000 --- a/tests/feature_benchmark/ha/read/runner.sh +++ /dev/null @@ -1,96 +0,0 @@ -#!/bin/bash - -## Helper functions - -function wait_for_server { - port=$1 - while ! nc -z -w 1 127.0.0.1 $port; do - sleep 0.1 - done - sleep 1 -} - -function echo_info { printf "\033[1;36m~~ $1 ~~\033[0m\n"; } -function echo_success { printf "\033[1;32m~~ $1 ~~\033[0m\n\n"; } -function echo_failure { printf "\033[1;31m~~ $1 ~~\033[0m\n\n"; } - -## Environment setup - -# Get script location. -DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" -cd "$DIR" - -# Find memgraph binaries. -binary_dir="$DIR/../../../../build" - -# Results for apollo -RESULTS="$DIR/.apollo_measurements" - -# Benchmark parameters -DURATION=10 - -# Startup -declare -a HA_PIDS -declare -a exit_codes - -for server_id in 1 2 3 -do - $binary_dir/memgraph_ha --server_id $server_id \ - --coordination_config_file="coordination.json" \ - --raft_config_file="raft.json" \ - --bolt-port $((7686 + $server_id)) \ - --db-recover-on-startup=false \ - --durability_directory=dur$server_id & - HA_PIDS[$server_id]=$! - wait_for_server $((7686 + $server_id)) -done - -# Allow some time for leader election. -sleep 10 - -# Start the memgraph process and wait for it to start. -echo_info "Starting HA read benchmark" -$binary_dir/tests/feature_benchmark/ha/read/benchmark \ - --duration=$DURATION \ - --output-file=$RESULTS & -pid=$! - -wait -n $pid -exit_codes[0]=$? - -# Shutdown -for server_id in 1 2 3 -do - kill -15 ${HA_PIDS[$server_id]} -done - -# Cleanup -for server_id in 1 2 3 -do - wait -n ${HA_PIDS[$server_id]} - exit_codes[$server_id]=$? - rm -r dur$server_id -done - -failure=0 - -for i in `seq 0 3`; do - code=${exit_codes[$i]} - if [ $code -ne 0 ]; then - if [ $i -eq 0 ]; then - echo_failure "Benchmark exited with status $code" - else - echo_failure "Memgraph HA server $i exited with status $code" - fi - - failure=1 - fi -done - -if [ $failure -eq 0 ]; then - echo_success "Benchmark finished successfully" -else - echo_failure "Benchmark didn't finish successfully" -fi - -exit $failure diff --git a/tests/feature_benchmark/ha/write/CMakeLists.txt b/tests/feature_benchmark/ha/write/CMakeLists.txt deleted file mode 100644 index 8e3b15746..000000000 --- a/tests/feature_benchmark/ha/write/CMakeLists.txt +++ /dev/null @@ -1,6 +0,0 @@ -set(target_name memgraph__feature_benchmark__ha__write) - -set(benchmark_target_name ${target_name}__benchmark) -add_executable(${benchmark_target_name} benchmark.cpp) -set_target_properties(${benchmark_target_name} PROPERTIES OUTPUT_NAME benchmark) -target_link_libraries(${benchmark_target_name} mg-utils mg-communication) diff --git a/tests/feature_benchmark/ha/write/benchmark.cpp b/tests/feature_benchmark/ha/write/benchmark.cpp deleted file mode 100644 index 55063f0e7..000000000 --- a/tests/feature_benchmark/ha/write/benchmark.cpp +++ /dev/null @@ -1,94 +0,0 @@ -#include -#include -#include -#include -#include - -#include -#include - -#include "communication/bolt/ha_client.hpp" -#include "io/network/endpoint.hpp" -#include "io/network/utils.hpp" -#include "utils/flag_validation.hpp" -#include "utils/thread.hpp" -#include "utils/timer.hpp" - -using namespace std::literals::chrono_literals; - -DEFINE_string(address, "127.0.0.1", "Server address"); -DEFINE_int32(port, 7687, "Server port"); -DEFINE_int32(cluster_size, 3, "Size of the raft cluster."); -DEFINE_string(username, "", "Username for the database"); -DEFINE_string(password, "", "Password for the database"); -DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server."); -DEFINE_double(duration, 10.0, - "How long should the client perform writes (seconds)"); -DEFINE_string(output_file, "", "Output file where the results should be."); - -int main(int argc, char **argv) { - gflags::ParseCommandLineFlags(&argc, &argv, true); - google::SetUsageMessage("Memgraph HA write benchmark client"); - google::InitGoogleLogging(argv[0]); - - std::atomic query_counter{0}; - - std::vector endpoints; - for (int i = 0; i < FLAGS_cluster_size; ++i) { - uint16_t port = FLAGS_port + i; - io::network::Endpoint endpoint{FLAGS_address, port}; - endpoints.push_back(endpoint); - } - - const int num_threads = std::thread::hardware_concurrency(); - std::vector threads; - std::vector thread_duration; - threads.reserve(num_threads); - thread_duration.resize(num_threads); - - for (int i = 0; i < num_threads; ++i) { - threads.emplace_back([i, endpoints = endpoints, &query_counter, - &local_duration = thread_duration[i]]() { - utils::ThreadSetName(fmt::format("BenchWriter{}", i)); - communication::ClientContext context(FLAGS_use_ssl); - std::chrono::milliseconds retry_delay(1000); - communication::bolt::HAClient client(endpoints, &context, FLAGS_username, - FLAGS_password, 10, retry_delay); - - utils::Timer t; - while (true) { - local_duration = t.Elapsed().count(); - if (local_duration >= FLAGS_duration) break; - - try { - client.Execute("CREATE (:Node)", {}); - query_counter.fetch_add(1); - } catch (const communication::bolt::ClientQueryException &e) { - LOG(WARNING) << e.what(); - break; - } catch (const communication::bolt::ClientFatalException &e) { - LOG(WARNING) << e.what(); - break; - } - } - }); - } - - for (auto &t : threads) { - if (t.joinable()) t.join(); - } - - double duration = 0; - for (auto &d : thread_duration) duration += d; - duration /= num_threads; - - double write_per_second = query_counter / duration; - - std::ofstream output(FLAGS_output_file); - output << "duration " << duration << std::endl; - output << "executed_writes " << query_counter << std::endl; - output << "write_per_second " << write_per_second << std::endl; - output.close(); - - return 0; -} diff --git a/tests/feature_benchmark/ha/write/coordination.json b/tests/feature_benchmark/ha/write/coordination.json deleted file mode 100644 index 35b8c65aa..000000000 --- a/tests/feature_benchmark/ha/write/coordination.json +++ /dev/null @@ -1,5 +0,0 @@ -[ - [1, "127.0.0.1", 10000], - [2, "127.0.0.1", 10001], - [3, "127.0.0.1", 10002] -] diff --git a/tests/feature_benchmark/ha/write/raft.json b/tests/feature_benchmark/ha/write/raft.json deleted file mode 100644 index 744fcf1f6..000000000 --- a/tests/feature_benchmark/ha/write/raft.json +++ /dev/null @@ -1,7 +0,0 @@ -{ - "election_timeout_min": 350, - "election_timeout_max": 700, - "heartbeat_interval": 100, - "replication_timeout": 10000, - "log_size_snapshot_threshold": -1 -} diff --git a/tests/feature_benchmark/ha/write/runner.sh b/tests/feature_benchmark/ha/write/runner.sh deleted file mode 100755 index 0c8e26ce8..000000000 --- a/tests/feature_benchmark/ha/write/runner.sh +++ /dev/null @@ -1,96 +0,0 @@ -#!/bin/bash - -## Helper functions - -function wait_for_server { - port=$1 - while ! nc -z -w 1 127.0.0.1 $port; do - sleep 0.1 - done - sleep 1 -} - -function echo_info { printf "\033[1;36m~~ $1 ~~\033[0m\n"; } -function echo_success { printf "\033[1;32m~~ $1 ~~\033[0m\n\n"; } -function echo_failure { printf "\033[1;31m~~ $1 ~~\033[0m\n\n"; } - -## Environment setup - -# Get script location. -DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" -cd "$DIR" - -# Find memgraph binaries. -binary_dir="$DIR/../../../../build" - -# Results for apollo -RESULTS="$DIR/.apollo_measurements" - -# Benchmark parameters -DURATION=10 - -# Startup -declare -a HA_PIDS -declare -a exit_codes - -for server_id in 1 2 3 -do - $binary_dir/memgraph_ha --server_id $server_id \ - --coordination_config_file="coordination.json" \ - --raft_config_file="raft.json" \ - --bolt-port $((7686 + $server_id)) \ - --db-recover-on-startup=false \ - --durability_directory=dur$server_id & - HA_PIDS[$server_id]=$! - wait_for_server $((7686 + $server_id)) -done - -# Allow some time for leader election. -sleep 10 - -# Start the memgraph process and wait for it to start. -echo_info "Starting HA write benchmark" -$binary_dir/tests/feature_benchmark/ha/write/benchmark \ - --duration=$DURATION \ - --output-file=$RESULTS & -pid=$! - -wait -n $pid -exit_codes[0]=$? - -# Shutdown -for server_id in 1 2 3 -do - kill -15 ${HA_PIDS[$server_id]} -done - -# Cleanup -for server_id in 1 2 3 -do - wait -n ${HA_PIDS[$server_id]} - exit_codes[$server_id]=$? - rm -r dur$server_id -done - -failure=0 - -for i in `seq 0 3`; do - code=${exit_codes[$i]} - if [ $code -ne 0 ]; then - if [ $i -eq 0 ]; then - echo_failure "Benchmark exited with status $code" - else - echo_failure "Memgraph HA server $i exited with status $code" - fi - - failure=1 - fi -done - -if [ $failure -eq 0 ]; then - echo_success "Benchmark finished successfully" -else - echo_failure "Benchmark didn't finish successfully" -fi - -exit $failure diff --git a/tests/integration/ha/basic/CMakeLists.txt b/tests/integration/ha/basic/CMakeLists.txt deleted file mode 100644 index bc5b55923..000000000 --- a/tests/integration/ha/basic/CMakeLists.txt +++ /dev/null @@ -1,6 +0,0 @@ -set(target_name memgraph__integration__ha_basic) -set(tester_target_name ${target_name}__tester) - -add_executable(${tester_target_name} tester.cpp) -set_target_properties(${tester_target_name} PROPERTIES OUTPUT_NAME tester) -target_link_libraries(${tester_target_name} mg-utils mg-communication) diff --git a/tests/integration/ha/basic/raft.json b/tests/integration/ha/basic/raft.json deleted file mode 100644 index ede2ddc62..000000000 --- a/tests/integration/ha/basic/raft.json +++ /dev/null @@ -1,7 +0,0 @@ -{ - "election_timeout_min": 750, - "election_timeout_max": 1000, - "heartbeat_interval": 100, - "replication_timeout": 10000, - "log_size_snapshot_threshold": -1 -} diff --git a/tests/integration/ha/basic/runner.py b/tests/integration/ha/basic/runner.py deleted file mode 100755 index 22502a255..000000000 --- a/tests/integration/ha/basic/runner.py +++ /dev/null @@ -1,135 +0,0 @@ -#!/usr/bin/python3 - -""" -This test checks the the basic functionality of HA Memgraph. It incorporates -both leader election and log replication processes. - -The test proceeds as follows for clusters of size 3 and 5: - 1) Start the whole cluster - 2) Kill random workers but leave the majority alive - 3) Create a single Node - 4) Bring dead nodes back to life - 5) Kill random workers but leave the majority alive - 6) Check if everything is ok with DB state - 7) GOTO 1) and repeat 25 times -""" - -import argparse -import os -import time -import subprocess -import sys -import random - -SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) -PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", "..", "..")) - -# append parent directory -sys.path.append(os.path.join(SCRIPT_DIR, "..")) - -from ha_test import HaTestBase - - -class HaBasicTest(HaTestBase): - def execute_step(self, step, node_count): - if step == "create": - print("Executing create query") - client = subprocess.Popen([self.tester_binary, - "--step", "create", - "--cluster-size", str(self.cluster_size), - "--node-count", str(node_count)]) - elif step == "count": - print("Executing count query") - client = subprocess.Popen([self.tester_binary, - "--step", "count", - "--cluster_size", str(self.cluster_size), - "--node-count", str(node_count)]) - else: - return 0 - - # Check what happened with query execution. - try: - code = client.wait(timeout=30) - except subprocess.TimeoutExpired as e: - print("HA client timed out!") - client.kill() - return 1 - - return code - - - def start_workers(self, worker_ids): - for wid in worker_ids: - print("Starting worker {}".format(wid + 1)) - self.start_worker(wid) - - - def kill_workers(self, worker_ids): - for wid in worker_ids: - print("Killing worker {}".format(wid + 1)) - self.kill_worker(wid) - - - def execute(self): - self.start_cluster() - - expected_results = 0 - - # Make sure at least one node exists. - assert self.execute_step("create", expected_results) == 0, \ - "Error while executing create query" - expected_results = 1 - - for i in range(20): - # Create step - partition = random.sample(range(self.cluster_size), - random.randint(0, int((self.cluster_size - 1) / 2))) - - self.kill_workers(partition) - - assert self.execute_step("create", expected_results) == 0, \ - "Error while executing create query" - expected_results += 1 - - self.start_workers(partition) - - # Check step - partition = random.sample(range(self.cluster_size), - random.randint(0, int((self.cluster_size - 1) / 2))) - - self.kill_workers(partition) - - assert self.execute_step("count", expected_results) == 0, \ - "Error while executing count query" - - self.start_workers(partition) - - # Check that no data was lost. - assert self.execute_step("count", expected_results) == 0, \ - "Error while executing count query" - - -def find_correct_path(path): - return os.path.join(PROJECT_DIR, "build", path) - - -if __name__ == "__main__": - memgraph_binary = find_correct_path("memgraph_ha") - tester_binary = find_correct_path(os.path.join("tests", "integration", "ha", - "basic", "tester")) - - raft_config_file = os.path.join(PROJECT_DIR, "tests", "integration", "ha", - "basic", "raft.json") - - parser = argparse.ArgumentParser() - parser.add_argument("--memgraph", default=memgraph_binary) - parser.add_argument("--raft_config_file", default=raft_config_file) - args = parser.parse_args() - - for cluster_size in [3, 5]: - print("\033[1;36m~~ Executing test with cluster size: %d~~\033[0m" % (cluster_size)) - HaBasicTest( - args.memgraph, tester_binary, args.raft_config_file, cluster_size) - print("\033[1;32m~~ The test finished successfully ~~\033[0m") - - sys.exit(0) diff --git a/tests/integration/ha/basic/tester.cpp b/tests/integration/ha/basic/tester.cpp deleted file mode 100644 index d36ef4b38..000000000 --- a/tests/integration/ha/basic/tester.cpp +++ /dev/null @@ -1,67 +0,0 @@ -#include -#include -#include - -#include -#include - -#include "communication/bolt/ha_client.hpp" -#include "io/network/endpoint.hpp" -#include "io/network/utils.hpp" -#include "utils/timer.hpp" - -DEFINE_string(address, "127.0.0.1", "Server address"); -DEFINE_int32(port, 7687, "Server port"); -DEFINE_int32(cluster_size, 3, "Size of the raft cluster."); -DEFINE_int32(node_count, -1, "Expected number of nodes in the database."); -DEFINE_string(username, "", "Username for the database"); -DEFINE_string(password, "", "Password for the database"); -DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server."); -DEFINE_string(step, "", "The step to execute (available: create, count)"); - -using namespace std::chrono_literals; - -int main(int argc, char **argv) { - gflags::ParseCommandLineFlags(&argc, &argv, true); - google::InitGoogleLogging(argv[0]); - - communication::SSLInit sslInit; - - try { - std::vector endpoints(FLAGS_cluster_size); - for (int i = 0; i < FLAGS_cluster_size; ++i) - endpoints[i] = io::network::Endpoint(FLAGS_address, FLAGS_port + i); - - std::chrono::milliseconds retry_delay(1000); - communication::ClientContext context(FLAGS_use_ssl); - communication::bolt::HAClient client(endpoints, &context, FLAGS_username, - FLAGS_password, 25, retry_delay); - - if (FLAGS_step == "create") { - client.Execute("create (:Node {id: $id})", - {{"id", FLAGS_node_count + 1}}); - return 0; - } else if (FLAGS_step == "count") { - auto result = client.Execute("match (n) return n", {}); - if (result.records.size() != FLAGS_node_count) { - LOG(WARNING) << "Missing data: expected " << FLAGS_node_count - << ", got " << result.records.size(); - return 2; - } - return 0; - } else { - LOG(FATAL) << "Unexpected client step!"; - } - } catch (const communication::bolt::ClientQueryException &e) { - LOG(WARNING) - << "Transient error while executing query. (eg. mistyped query, etc.)\n" - << e.what(); - } catch (const communication::bolt::ClientFatalException &e) { - LOG(WARNING) << "Couldn't connect to server\n" << e.what(); - } catch (const utils::BasicException &e) { - LOG(WARNING) << "Error while executing query\n" << e.what(); - } - - // The test wasn't successfull - return 1; -} diff --git a/tests/integration/ha/constraints/CMakeLists.txt b/tests/integration/ha/constraints/CMakeLists.txt deleted file mode 100644 index 270e22861..000000000 --- a/tests/integration/ha/constraints/CMakeLists.txt +++ /dev/null @@ -1,6 +0,0 @@ -set(target_name memgraph__integration__ha_constraints) -set(tester_target_name ${target_name}__tester) - -add_executable(${tester_target_name} tester.cpp) -set_target_properties(${tester_target_name} PROPERTIES OUTPUT_NAME tester) -target_link_libraries(${tester_target_name} mg-utils mg-communication) diff --git a/tests/integration/ha/constraints/raft.json b/tests/integration/ha/constraints/raft.json deleted file mode 100644 index 93a21c5b7..000000000 --- a/tests/integration/ha/constraints/raft.json +++ /dev/null @@ -1,7 +0,0 @@ -{ - "election_timeout_min": 200, - "election_timeout_max": 500, - "heartbeat_interval": 100, - "replication_timeout": 10000, - "log_size_snapshot_threshold": -1 -} diff --git a/tests/integration/ha/constraints/runner.py b/tests/integration/ha/constraints/runner.py deleted file mode 100755 index 45d1ac2ce..000000000 --- a/tests/integration/ha/constraints/runner.py +++ /dev/null @@ -1,119 +0,0 @@ -#!/usr/bin/python3 - -import argparse -import os -import time -import random -import subprocess -import sys - -SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) -PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", "..", "..")) - -# append parent directory -sys.path.append(os.path.join(SCRIPT_DIR, "..")) - -from ha_test import HaTestBase - - -class HaIndexTest(HaTestBase): - def execute_step(self, step, property_value=None, expected_status=None, - expected_result=None): - if step == "create": - print("Executing create query") - client = subprocess.Popen([self.tester_binary, "--step", "create", - "--cluster_size", str(self.cluster_size)]) - - elif step == "drop": - print("Executing drop query") - client = subprocess.Popen([self.tester_binary, "--step", "drop", - "--cluster_size", str(self.cluster_size)]) - elif step == "add_node": - print("Executing add_node query ") - client = subprocess.Popen([self.tester_binary, "--step", "add_node", - "--cluster_size", str(self.cluster_size), "--expected_status", - str(expected_status), "--property_value", str(property_value)]) - - elif step == "check": - print("Executing check query") - client = subprocess.Popen([self.tester_binary, "--step", "check", - "--cluster_size", str(self.cluster_size), "--expected_result", - str(expected_result)]) - else: - raise ValueError("Invalid step argument: " + step) - - # Check what happened with query execution. - try: - code = client.wait(timeout=30) - except subprocess.TimeoutExpired as e: - print("HA client timed out!") - client.kill() - return 1 - - return code - - - def execute(self): - self.start_cluster() - num_nodes = 1 - - assert self.execute_step("add_node", expected_status=0, \ - property_value=num_nodes) == 0, \ - "Error while executing add_node query" - - assert self.execute_step("create") == 0, \ - "Error while executing create query" - - assert self.execute_step("check", expected_result=num_nodes) == 0, \ - "Error while executing check query" - - for i in range(self.cluster_size): - # Kill worker. - print("Killing worker {}".format(i)) - self.kill_worker(i) - - assert self.execute_step("add_node", expected_status=1, \ - property_value=num_nodes) == 0, \ - "Error while executing add_node query" - - assert self.execute_step("add_node", expected_status=0, \ - property_value=num_nodes + 1) == 0, \ - "Error while executing add_node query" - - num_nodes += 1 - - # Bring worker back to life. - print("Starting worker {}".format(i)) - self.start_worker(i) - - assert self.execute_step("drop") == 0, \ - "Error while executing drop query" - - assert self.execute_step("check", expected_result=num_nodes) == 0, \ - "Error while executing check query" - - -def find_correct_path(path): - return os.path.join(PROJECT_DIR, "build", path) - - -if __name__ == "__main__": - memgraph_binary = find_correct_path("memgraph_ha") - tester_binary = find_correct_path(os.path.join("tests", "integration", "ha", - "constraints", "tester")) - - raft_config_file = os.path.join(PROJECT_DIR, "tests", "integration", "ha", - "constraints", "raft.json") - - parser = argparse.ArgumentParser() - parser.add_argument("--memgraph", default=memgraph_binary) - parser.add_argument("--raft_config_file", default=raft_config_file) - args = parser.parse_args() - - for cluster_size in [3, 5]: - print("\033[1;36m~~ Executing test with cluster size: %d~~\033[0m" % (cluster_size)) - HaIndexTest( - args.memgraph, tester_binary, args.raft_config_file, cluster_size) - print("\033[1;32m~~ The test finished successfully ~~\033[0m") - - sys.exit(0) diff --git a/tests/integration/ha/constraints/tester.cpp b/tests/integration/ha/constraints/tester.cpp deleted file mode 100644 index a9912f579..000000000 --- a/tests/integration/ha/constraints/tester.cpp +++ /dev/null @@ -1,99 +0,0 @@ -#include -#include -#include - -#include -#include -#include - -#include "communication/bolt/ha_client.hpp" -#include "io/network/endpoint.hpp" -#include "io/network/utils.hpp" -#include "utils/timer.hpp" - -DEFINE_string(address, "127.0.0.1", "Server address"); -DEFINE_int32(port, 7687, "Server port"); -DEFINE_int32(cluster_size, 3, "Size of the raft cluster."); -DEFINE_int32(num_retries, 20, "Number of (leader) execution retries."); -DEFINE_string(username, "", "Username for the database"); -DEFINE_string(password, "", "Password for the database"); -DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server."); - -DEFINE_string(step, "", - "The step to execute (available: create, check, add_node, drop"); -DEFINE_int32(property_value, 0, "Value of the property when creating a node."); -DEFINE_int32( - expected_status, 0, - "Expected query execution status when creating a node, 0 is success"); -DEFINE_int32(expected_result, 0, "Expected query result"); - -using namespace std::chrono_literals; - -int main(int argc, char **argv) { - gflags::ParseCommandLineFlags(&argc, &argv, true); - google::InitGoogleLogging(argv[0]); - - communication::SSLInit sslInit; - - try { - std::vector endpoints; - for (int i = 0; i < FLAGS_cluster_size; ++i) { - uint16_t port = FLAGS_port + i; - io::network::Endpoint endpoint{FLAGS_address, port}; - endpoints.push_back(endpoint); - } - - std::chrono::milliseconds retry_delay(1000); - communication::ClientContext context(FLAGS_use_ssl); - communication::bolt::HAClient client(endpoints, &context, FLAGS_username, - FLAGS_password, FLAGS_num_retries, - retry_delay); - - if (FLAGS_step == "create") { - client.Execute("create constraint on (n:Node) assert n.prop is unique", - {}); - return 0; - } else if (FLAGS_step == "drop") { - client.Execute("drop constraint on (n:Node) assert n.prop is unique", {}); - return 0; - } else if (FLAGS_step == "add_node") { - client.Execute( - fmt::format("create (:Node{{prop:{}}})", FLAGS_property_value), {}); - - if (FLAGS_expected_status == 0) { - return 0; - } else { - LOG(WARNING) << "Query execution should've fail but it didn't."; - } - - } else if (FLAGS_step == "check") { - auto result = client.Execute("match (n) return n", {}); - - if (result.records.size() != FLAGS_expected_result) { - LOG(WARNING) << "Unexpected number of nodes: " - << "expected " << FLAGS_expected_result << ", got " - << result.records.size(); - return 2; - } - return 0; - - } else { - LOG(FATAL) << "Unexpected client step!"; - } - } catch (const communication::bolt::ClientQueryException &e) { - // Sometimes we expect the query to fail, so we need to handle this as - // success. - if (FLAGS_expected_status == 0) { - LOG(WARNING) << "There was some transient error during query execution."; - } else { - LOG(INFO) << "Query execution failed as expected, message: " << e.what(); - return 0; - } - } catch (const communication::bolt::ClientFatalException &) { - LOG(WARNING) << "Failed to communicate with the leader."; - } catch (const utils::BasicException &) { - LOG(WARNING) << "Error while executing query."; - } - - return 1; -} diff --git a/tests/integration/ha/ha_test.py b/tests/integration/ha/ha_test.py deleted file mode 100644 index 72a084362..000000000 --- a/tests/integration/ha/ha_test.py +++ /dev/null @@ -1,120 +0,0 @@ -#!/usr/bin/python3 - -import json -import os -import subprocess -import tempfile -import time - - -class HaTestBase: - def __init__(self, memgraph_binary, tester_binary, raft_config_file, - cluster_size): - - self.workers = [None for worker in range(cluster_size)] - self.memgraph_binary = memgraph_binary - self.tester_binary = tester_binary - self.raft_config_file = raft_config_file - self.cluster_size = cluster_size - - # Get a temporary directory used for durability. - self._tempdir = tempfile.TemporaryDirectory() - - # generate coordination config file - self.coordination_config_file = tempfile.NamedTemporaryFile() - coordination = self._generate_json_coordination_config() - self.coordination_config_file.write(bytes(coordination, "UTF-8")) - self.coordination_config_file.flush() - - self.execute() - - - def __del__(self): - self.destroy_cluster() - - - def start_cluster(self): - for worker_id in range(self.cluster_size): - self.start_worker(worker_id) - - # allow some time for leader election - time.sleep(5) - - - def destroy_cluster(self): - for worker in self.workers: - if worker is None: continue - worker.kill() - worker.wait() - self.workers.clear() - self.coordination_config_file.close() - - - def kill_worker(self, worker_id): - assert worker_id >= 0 and worker_id < self.cluster_size, \ - "Invalid worker ID {}".format(worker_id) - assert self.workers[worker_id] is not None, \ - "Worker {} doesn't exists".format(worker_id) - - self.workers[worker_id].kill() - self.workers[worker_id].wait() - self.workers[worker_id] = None - - - def start_worker(self, worker_id): - assert worker_id >= 0 and worker_id < self.cluster_size, \ - "Invalid worker ID {}".format(worker_id) - assert self.workers[worker_id] is None, \ - "Worker already exists".format(worker_id) - - self.workers[worker_id] = subprocess.Popen(self._generate_args(worker_id)) - - time.sleep(0.2) - assert self.workers[worker_id].poll() is None, \ - "Worker{} process died prematurely!".format(worker_id) - - self._wait_for_server(7687 + worker_id) - - - def is_worker_alive(self, worker_id): - assert worker_id >= 0 and worker_id < self.cluster_size, \ - "Invalid worker ID {}".format(worker_id) - return self.workers[worker_id] is None or \ - self.workers[worker_id].poll() is None - - - def execute(self): - raise NotImplementedError() - - - def _wait_for_server(self, port, delay=0.1): - cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)] - while subprocess.call(cmd) != 0: - time.sleep(delay) - time.sleep(delay) - - - def get_durability_directory(self, worker_id): - return os.path.join(self._tempdir.name, "worker" + str(worker_id)) - - - def _generate_args(self, worker_id): - args = [self.memgraph_binary] - args.extend(["--server_id", str(worker_id + 1)]) - args.extend(["--bolt-port", str(7687 + worker_id)]) - args.extend(["--raft_config_file", self.raft_config_file]) - args.extend(["--coordination_config_file", - self.coordination_config_file.name]) - - # Each worker must have a unique durability directory. - args.extend(["--durability_directory", - self.get_durability_directory(worker_id)]) - return args - - - def _generate_json_coordination_config(self): - data = [] - for i in range(self.cluster_size): - data.append([i + 1, "127.0.0.1", 10000 + i]) - return json.dumps(data) - diff --git a/tests/integration/ha/index/CMakeLists.txt b/tests/integration/ha/index/CMakeLists.txt deleted file mode 100644 index 3269a1ee1..000000000 --- a/tests/integration/ha/index/CMakeLists.txt +++ /dev/null @@ -1,6 +0,0 @@ -set(target_name memgraph__integration__ha_index) -set(tester_target_name ${target_name}__tester) - -add_executable(${tester_target_name} tester.cpp) -set_target_properties(${tester_target_name} PROPERTIES OUTPUT_NAME tester) -target_link_libraries(${tester_target_name} mg-utils mg-communication) diff --git a/tests/integration/ha/index/raft.json b/tests/integration/ha/index/raft.json deleted file mode 100644 index 93a21c5b7..000000000 --- a/tests/integration/ha/index/raft.json +++ /dev/null @@ -1,7 +0,0 @@ -{ - "election_timeout_min": 200, - "election_timeout_max": 500, - "heartbeat_interval": 100, - "replication_timeout": 10000, - "log_size_snapshot_threshold": -1 -} diff --git a/tests/integration/ha/index/runner.py b/tests/integration/ha/index/runner.py deleted file mode 100755 index a45b2ddca..000000000 --- a/tests/integration/ha/index/runner.py +++ /dev/null @@ -1,86 +0,0 @@ -#!/usr/bin/python3 - -import argparse -import os -import time -import random -import subprocess -import sys - -SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) -PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", "..", "..")) - -# append parent directory -sys.path.append(os.path.join(SCRIPT_DIR, "..")) - -from ha_test import HaTestBase - - -class HaIndexTest(HaTestBase): - def execute_step(self, step): - if step == "create": - print("Executing create query") - client = subprocess.Popen([self.tester_binary, "--step", "create", - "--cluster_size", str(self.cluster_size)]) - - elif step == "check": - print("Executing check query") - client = subprocess.Popen([self.tester_binary, "--step", "check", - "--cluster_size", str(self.cluster_size)]) - else: - return 0 - - # Check what happened with query execution. - try: - code = client.wait(timeout=30) - except subprocess.TimeoutExpired as e: - print("HA client timed out!") - client.kill() - return 1 - - return code - - - def execute(self): - self.start_cluster() - - assert self.execute_step("create") == 0, \ - "Error while executing create query" - - for i in range(self.cluster_size): - # Kill worker. - print("Killing worker {}".format(i + 1)) - self.kill_worker(i) - - assert self.execute_step("check") == 0, \ - "Error while executing check query" - - # Bring worker back to life. - print("Starting worker {}".format(i + 1)) - self.start_worker(i) - - -def find_correct_path(path): - return os.path.join(PROJECT_DIR, "build", path) - - -if __name__ == "__main__": - memgraph_binary = find_correct_path("memgraph_ha") - tester_binary = find_correct_path(os.path.join("tests", "integration", "ha", - "index", "tester")) - - raft_config_file = os.path.join(PROJECT_DIR, "tests", "integration", "ha", - "index", "raft.json") - - parser = argparse.ArgumentParser() - parser.add_argument("--memgraph", default=memgraph_binary) - parser.add_argument("--raft_config_file", default=raft_config_file) - args = parser.parse_args() - - for cluster_size in [3, 5]: - print("\033[1;36m~~ Executing test with cluster size: %d~~\033[0m" % (cluster_size)) - HaIndexTest( - args.memgraph, tester_binary, args.raft_config_file, cluster_size) - print("\033[1;32m~~ The test finished successfully ~~\033[0m") - - sys.exit(0) diff --git a/tests/integration/ha/index/tester.cpp b/tests/integration/ha/index/tester.cpp deleted file mode 100644 index a7bb30970..000000000 --- a/tests/integration/ha/index/tester.cpp +++ /dev/null @@ -1,75 +0,0 @@ -#include -#include -#include - -#include -#include -#include - -#include "communication/bolt/ha_client.hpp" -#include "io/network/endpoint.hpp" -#include "io/network/utils.hpp" -#include "utils/timer.hpp" - -DEFINE_string(address, "127.0.0.1", "Server address"); -DEFINE_int32(port, 7687, "Server port"); -DEFINE_int32(cluster_size, 3, "Size of the raft cluster."); -DEFINE_string(username, "", "Username for the database"); -DEFINE_string(password, "", "Password for the database"); -DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server."); -DEFINE_string(step, "", "The step to execute (available: create, check)"); - -using namespace std::chrono_literals; -using communication::bolt::Value; - -int main(int argc, char **argv) { - gflags::ParseCommandLineFlags(&argc, &argv, true); - google::InitGoogleLogging(argv[0]); - - const std::string index = ":Node(id)"; - - communication::SSLInit sslInit; - try { - std::vector endpoints(FLAGS_cluster_size); - for (int i = 0; i < FLAGS_cluster_size; ++i) - endpoints[i] = io::network::Endpoint(FLAGS_address, FLAGS_port + i); - - std::chrono::milliseconds retry_delay(1000); - communication::ClientContext context(FLAGS_use_ssl); - communication::bolt::HAClient client(endpoints, &context, FLAGS_username, - FLAGS_password, 25, retry_delay); - - if (FLAGS_step == "create") { - client.Execute(fmt::format("CREATE INDEX ON {}", index), {}); - return 0; - } else if (FLAGS_step == "check") { - auto result = client.Execute("SHOW INDEX INFO", {}); - auto checker = [&index](const std::vector &record) { - if (record.size() != 1) return false; - return record[0].ValueString() == index; - }; - - // Check that index ":Node(id)" exists - if (!std::any_of(result.records.begin(), result.records.end(), checker)) { - LOG(WARNING) << "Missing index!"; - return 2; - } - - return 0; - } else { - LOG(FATAL) << "Unexpected client step!"; - } - - } catch (const communication::bolt::ClientQueryException &e) { - LOG(WARNING) - << "Transient error while executing query. (eg. mistyped query, etc.)\n" - << e.what(); - } catch (const communication::bolt::ClientFatalException &e) { - LOG(WARNING) << "Couldn't connect to server\n" << e.what(); - } catch (const utils::BasicException &e) { - LOG(WARNING) << "Error while executing query\n" << e.what(); - } - - // The test wasn't successfull - return 1; -} diff --git a/tests/integration/ha/large_log_entries/CMakeLists.txt b/tests/integration/ha/large_log_entries/CMakeLists.txt deleted file mode 100644 index 8052b40a0..000000000 --- a/tests/integration/ha/large_log_entries/CMakeLists.txt +++ /dev/null @@ -1,6 +0,0 @@ -set(target_name memgraph__integration__ha_large_log_entries) -set(tester_target_name ${target_name}__tester) - -add_executable(${tester_target_name} tester.cpp) -set_target_properties(${tester_target_name} PROPERTIES OUTPUT_NAME tester) -target_link_libraries(${tester_target_name} mg-utils mg-communication) diff --git a/tests/integration/ha/large_log_entries/raft.json b/tests/integration/ha/large_log_entries/raft.json deleted file mode 100644 index ede2ddc62..000000000 --- a/tests/integration/ha/large_log_entries/raft.json +++ /dev/null @@ -1,7 +0,0 @@ -{ - "election_timeout_min": 750, - "election_timeout_max": 1000, - "heartbeat_interval": 100, - "replication_timeout": 10000, - "log_size_snapshot_threshold": -1 -} diff --git a/tests/integration/ha/large_log_entries/runner.py b/tests/integration/ha/large_log_entries/runner.py deleted file mode 100755 index 45403c397..000000000 --- a/tests/integration/ha/large_log_entries/runner.py +++ /dev/null @@ -1,92 +0,0 @@ -#!/usr/bin/python3 - -import argparse -import os -import time -import random -import subprocess -import sys - -SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) -PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", "..", "..")) - -# append parent directory -sys.path.append(os.path.join(SCRIPT_DIR, "..")) - -from ha_test import HaTestBase - - -class HaLargeLogEntriesTest(HaTestBase): - def execute_step(self, step, node_count, offset_nodes=None): - if step == "create": - print("Executing create query") - client = subprocess.Popen([self.tester_binary, "--step", "create", - "--cluster-size", str(self.cluster_size), - "--create-nodes", str(node_count), - "--offset-nodes", str(offset_nodes)]) - - elif step == "check": - print("Executing check query") - client = subprocess.Popen([self.tester_binary, "--step", "check", - "--cluster_size", str(self.cluster_size), - "--check_nodes", str(node_count)]) - else: - return 0 - - # Check what happened with query execution. - try: - code = client.wait(timeout=120) - except subprocess.TimeoutExpired as e: - print("HA client timed out!") - client.kill() - return 1 - - return code - - - def execute(self): - # Number of nodes to be created in a single batch - nodes = 250000 - - self.start_cluster() - - for i in range(self.cluster_size): - assert self.execute_step("create", nodes, i * nodes) == 0, \ - "Error while executing create query" - - # Kill worker. - print("Killing worker {}".format(i + 1)) - self.kill_worker(i) - - assert self.execute_step("check", (i + 1) * nodes) == 0, \ - "Error while executing check query" - - # Bring worker back to life. - print("Starting worker {}".format(i + 1)) - self.start_worker(i) - - -def find_correct_path(path): - return os.path.join(PROJECT_DIR, "build", path) - - -if __name__ == "__main__": - memgraph_binary = find_correct_path("memgraph_ha") - tester_binary = find_correct_path(os.path.join("tests", "integration", "ha", - "large_log_entries", "tester")) - - raft_config_file = os.path.join(PROJECT_DIR, "tests", "integration", "ha", - "large_log_entries", "raft.json") - - parser = argparse.ArgumentParser() - parser.add_argument("--memgraph", default=memgraph_binary) - parser.add_argument("--raft_config_file", default=raft_config_file) - args = parser.parse_args() - - for cluster_size in [3, 5]: - print("\033[1;36m~~ Executing test with cluster size: %d~~\033[0m" % (cluster_size)) - HaLargeLogEntriesTest( - args.memgraph, tester_binary, args.raft_config_file, cluster_size) - print("\033[1;32m~~ The test finished successfully ~~\033[0m") - - sys.exit(0) diff --git a/tests/integration/ha/large_log_entries/tester.cpp b/tests/integration/ha/large_log_entries/tester.cpp deleted file mode 100644 index 14ef4badf..000000000 --- a/tests/integration/ha/large_log_entries/tester.cpp +++ /dev/null @@ -1,72 +0,0 @@ -#include -#include -#include - -#include -#include - -#include "communication/bolt/ha_client.hpp" -#include "io/network/endpoint.hpp" -#include "io/network/utils.hpp" -#include "utils/timer.hpp" - -DEFINE_string(address, "127.0.0.1", "Server address"); -DEFINE_int32(port, 7687, "Server port"); -DEFINE_int32(cluster_size, 3, "Size of the raft cluster."); -DEFINE_int32(create_nodes, 250000, "Number of nodes to be created."); -DEFINE_int32(offset_nodes, 0, "Initial ID of created nodes"); -DEFINE_int32(check_nodes, -1, "Number of nodes that should be in the database"); -DEFINE_string(username, "", "Username for the database"); -DEFINE_string(password, "", "Password for the database"); -DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server."); -DEFINE_string(step, "", "The step to execute (available: create, check)"); - -using namespace std::chrono_literals; - -int main(int argc, char **argv) { - gflags::ParseCommandLineFlags(&argc, &argv, true); - google::InitGoogleLogging(argv[0]); - - communication::SSLInit sslInit; - try { - std::vector endpoints(FLAGS_cluster_size); - for (int i = 0; i < FLAGS_cluster_size; ++i) - endpoints[i] = io::network::Endpoint(FLAGS_address, FLAGS_port + i); - - std::chrono::milliseconds retry_delay(1000); - communication::ClientContext context(FLAGS_use_ssl); - communication::bolt::HAClient client(endpoints, &context, FLAGS_username, - FLAGS_password, 60, retry_delay); - - if (FLAGS_step == "create") { - client.Execute("UNWIND RANGE($start, $stop) AS x CREATE(:Node {id: x})", - {{"start", FLAGS_offset_nodes}, - {"stop", FLAGS_offset_nodes + FLAGS_create_nodes - 1}}); - return 0; - } else if (FLAGS_step == "check") { - auto result = client.Execute("MATCH (n) RETURN COUNT(n)", {}); - if (result.records[0][0].ValueInt() != FLAGS_check_nodes) { - LOG(WARNING) << "Wrong number of nodes! Got " + - std::to_string(result.records[0][0].ValueInt()) + - ", but expected " + - std::to_string(FLAGS_check_nodes); - return 2; - } - return 0; - } else { - LOG(FATAL) << "Unexpected client step!"; - } - - } catch (const communication::bolt::ClientQueryException &e) { - LOG(WARNING) - << "Transient error while executing query. (eg. mistyped query, etc.)\n" - << e.what(); - } catch (const communication::bolt::ClientFatalException &e) { - LOG(WARNING) << "Couldn't connect to server\n" << e.what(); - } catch (const utils::BasicException &e) { - LOG(WARNING) << "Error while executing query\n" << e.what(); - } - - // The test wasn't successfull - return 1; -} diff --git a/tests/integration/ha/leader_election/CMakeLists.txt b/tests/integration/ha/leader_election/CMakeLists.txt deleted file mode 100644 index aaaf68f6f..000000000 --- a/tests/integration/ha/leader_election/CMakeLists.txt +++ /dev/null @@ -1,6 +0,0 @@ -set(target_name memgraph__integration__ha_leader_election) -set(tester_target_name ${target_name}__tester) - -add_executable(${tester_target_name} tester.cpp) -set_target_properties(${tester_target_name} PROPERTIES OUTPUT_NAME tester) -target_link_libraries(${tester_target_name} mg-utils mg-communication) diff --git a/tests/integration/ha/leader_election/raft.json b/tests/integration/ha/leader_election/raft.json deleted file mode 100644 index ede2ddc62..000000000 --- a/tests/integration/ha/leader_election/raft.json +++ /dev/null @@ -1,7 +0,0 @@ -{ - "election_timeout_min": 750, - "election_timeout_max": 1000, - "heartbeat_interval": 100, - "replication_timeout": 10000, - "log_size_snapshot_threshold": -1 -} diff --git a/tests/integration/ha/leader_election/runner.py b/tests/integration/ha/leader_election/runner.py deleted file mode 100755 index 119d423ed..000000000 --- a/tests/integration/ha/leader_election/runner.py +++ /dev/null @@ -1,97 +0,0 @@ -#!/usr/bin/python3 - -""" -This test checks the correctness of a leader election process when its decoupled -from log replication. In other words, in this test we do not change the state of -the database, i.e., the Raft log remains empty. - -The test proceeds as follows for clusters of size 3 and 5: - 1) Start a random subset of workers in the cluster - 2) Check if the leader has been elected - 3) Kill all living workers - 4) GOTO 1) and repeat 10 times - -Naturally, throughout the process we keep track the number of alive workers and, -based on that number (majority or not), we decide whether the leader should have -been elected. -""" - -import argparse -import os -import time -import random -import subprocess -import sys -SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) -PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", "..", "..")) - -# append parent directory -sys.path.append(os.path.join(SCRIPT_DIR, "..")) - -from ha_test import HaTestBase - -def random_subset(l): - return random.sample(l, random.randint(0, len(l) - 1)) - - -class HaLeaderElectionTest(HaTestBase): - def leader_check(self, has_majority): - client = subprocess.Popen([self.tester_binary, - "--has-majority", str(int(has_majority)), - "--cluster-size", str(int(cluster_size))]) - - # Check what happened with query execution. - try: - code = client.wait(timeout=60) - except subprocess.TimeoutExpired as e: - print("Error! client timeout expired.") - client.kill() - return 1 - - return code - - - def execute(self): - for i in range(10): - # Awake random subset of dead nodes - alive = random_subset(range(0, self.cluster_size)) - for wid in alive: - print("Starting worker {}".format(wid + 1)) - self.start_worker(wid) - - # Check if leader is elected - assert self.leader_check(2 * len(alive) > self.cluster_size) == 0, \ - "Error while trying to find leader" - - # Kill living nodes - for wid in alive: - print("Killing worker {}".format(wid + 1)) - self.kill_worker(wid) - - self.destroy_cluster() - - -def find_correct_path(path): - return os.path.join(PROJECT_DIR, "build", path) - - -if __name__ == "__main__": - memgraph_binary = find_correct_path("memgraph_ha") - tester_binary = find_correct_path(os.path.join("tests", "integration", "ha", - "leader_election", "tester")) - - raft_config_file = os.path.join(PROJECT_DIR, "tests", "integration", "ha", - "leader_election", "raft.json") - - parser = argparse.ArgumentParser() - parser.add_argument("--memgraph", default=memgraph_binary) - parser.add_argument("--raft_config_file", default=raft_config_file) - args = parser.parse_args() - - for cluster_size in [3, 5]: - print("\033[1;36m~~ Executing test with cluster size: %d~~\033[0m" % (cluster_size)) - HaLeaderElectionTest( - args.memgraph, tester_binary, args.raft_config_file, cluster_size) - print("\033[1;32m~~ The test finished successfully ~~\033[0m") - - sys.exit(0) diff --git a/tests/integration/ha/leader_election/tester.cpp b/tests/integration/ha/leader_election/tester.cpp deleted file mode 100644 index fe0490ded..000000000 --- a/tests/integration/ha/leader_election/tester.cpp +++ /dev/null @@ -1,70 +0,0 @@ -#include -#include -#include - -#include -#include - -#include "communication/bolt/ha_client.hpp" -#include "io/network/endpoint.hpp" -#include "io/network/utils.hpp" -#include "utils/timer.hpp" - -DEFINE_string(address, "127.0.0.1", "Server address"); -DEFINE_int32(port, 7687, "Server port"); -DEFINE_int32(cluster_size, 3, "Size of the raft cluster."); -DEFINE_int32(has_majority, 0, "Should we be able to elect the leader."); -DEFINE_string(username, "", "Username for the database"); -DEFINE_string(password, "", "Password for the database"); -DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server."); - -using namespace std::chrono_literals; - -int main(int argc, char **argv) { - gflags::ParseCommandLineFlags(&argc, &argv, true); - google::InitGoogleLogging(argv[0]); - - communication::SSLInit sslInit; - try { - std::vector endpoints(FLAGS_cluster_size); - for (int i = 0; i < FLAGS_cluster_size; ++i) - endpoints[i] = io::network::Endpoint(FLAGS_address, FLAGS_port + i); - - std::chrono::milliseconds retry_delay(1000); - communication::ClientContext context(FLAGS_use_ssl); - communication::bolt::HAClient client(endpoints, &context, FLAGS_username, - FLAGS_password, 15, retry_delay); - - auto leader = client.GetLeaderId(); - if (!FLAGS_has_majority) { - LOG(WARNING) - << "The majority of cluster is dead but we have elected server " - << std::to_string(leader) << " as a leader"; - return 1; - } else { - LOG(INFO) << "Server " << std::to_string(leader) - << " was successfully elected as a leader"; - return 0; - } - - } catch (const communication::bolt::ClientFatalException &e) { - if (FLAGS_has_majority) { - LOG(WARNING) - << "The majority of cluster is alive but the leader was not elected."; - return 1; - } else { - LOG(INFO) - << "The has_majority of cluster is dead and no leader was elected."; - return 0; - } - } catch (const communication::bolt::ClientQueryException &e) { - LOG(WARNING) - << "Transient error while executing query. (eg. mistyped query, etc.)\n" - << e.what(); - } catch (const utils::BasicException &e) { - LOG(WARNING) << "Error while executing query\n" << e.what(); - } - - // The test wasn't successfull - return 1; -} diff --git a/tests/integration/ha/log_compaction/raft.json b/tests/integration/ha/log_compaction/raft.json deleted file mode 100644 index a8e1e0f75..000000000 --- a/tests/integration/ha/log_compaction/raft.json +++ /dev/null @@ -1,7 +0,0 @@ -{ - "election_timeout_min": 200, - "election_timeout_max": 500, - "heartbeat_interval": 100, - "replication_timeout": 10000, - "log_size_snapshot_threshold": 100 -} diff --git a/tests/integration/ha/log_compaction/runner.py b/tests/integration/ha/log_compaction/runner.py deleted file mode 100755 index bd02cf565..000000000 --- a/tests/integration/ha/log_compaction/runner.py +++ /dev/null @@ -1,104 +0,0 @@ -#!/usr/bin/python3 - -import argparse -import hashlib -import os -import time -import subprocess -import sys -import random - -SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) -PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", "..", "..")) - -# append parent directory -sys.path.append(os.path.join(SCRIPT_DIR, "..")) - -from ha_test import HaTestBase - - -def shasum(filename): - with open(filename, 'rb') as f: - return hashlib.sha1(f.read()).hexdigest() - - raise Exception("Couldn't get shasum for file {}".format(filename)) - - -class HaLogCompactionTest(HaTestBase): - def execute_step(self, query): - client = subprocess.Popen( - [self.tester_binary, "--cluster_size", str(self.cluster_size)], - stdin=subprocess.PIPE, stdout=subprocess.DEVNULL) - - try: - client.communicate(input=bytes(query, "UTF-8"), timeout=30) - except subprocess.TimeoutExpired as e: - client.kill() - client.communicate() - return 1 - - return client.returncode - - - def get_snapshot_path(self, worker_id): - dur = os.path.join(self.get_durability_directory(worker_id), "snapshots") - snapshots = os.listdir(dur) - - assert len(snapshots) == 1, \ - "More than one snapshot on worker {}!".format(worker_id + 1) - return os.path.join(dur, snapshots[0]) - - - def execute(self): - # custom cluster startup - for worker_id in range(1, self.cluster_size): - self.start_worker(worker_id) - - time.sleep(5) - assert self.execute_step("CREATE (:Node)\n" * 128) == 0, \ - "Error while executing create query" - - self.start_worker(0) - - # allow some time for the snapshot transfer - time.sleep(5) - snapshot_shasum = shasum(self.get_snapshot_path(0)) - - success = False - for worker_id in range(1, self.cluster_size): - if shasum(self.get_snapshot_path(worker_id)) == snapshot_shasum: - success = True - break - - # Check if the cluster is alive - for worker_id in range(self.cluster_size): - assert self.is_worker_alive(worker_id), \ - "Worker {} died prematurely".format(worker_id + 1) - - assert success, "Snapshot didn't transfer successfully" - - -def find_correct_path(path): - return os.path.join(PROJECT_DIR, "build", path) - - -if __name__ == "__main__": - memgraph_binary = find_correct_path("memgraph_ha") - tester_binary = find_correct_path(os.path.join("tests", "manual", - "ha_client")) - - raft_config_file = os.path.join(PROJECT_DIR, "tests", "integration", "ha", - "log_compaction", "raft.json") - - parser = argparse.ArgumentParser() - parser.add_argument("--memgraph", default=memgraph_binary) - parser.add_argument("--raft_config_file", default=raft_config_file) - args = parser.parse_args() - - for cluster_size in [3, 5]: - print("\033[1;36m~~ Executing test with cluster size: %d ~~\033[0m" % (cluster_size)) - HaLogCompactionTest( - args.memgraph, tester_binary, args.raft_config_file, cluster_size) - print("\033[1;32m~~ The test finished successfully ~~\033[0m") - - sys.exit(0) diff --git a/tests/integration/ha/term_updates/CMakeLists.txt b/tests/integration/ha/term_updates/CMakeLists.txt deleted file mode 100644 index e43402228..000000000 --- a/tests/integration/ha/term_updates/CMakeLists.txt +++ /dev/null @@ -1,6 +0,0 @@ -set(target_name memgraph__integration__ha_term_updates) -set(tester_target_name ${target_name}__tester) - -add_executable(${tester_target_name} tester.cpp) -set_target_properties(${tester_target_name} PROPERTIES OUTPUT_NAME tester) -target_link_libraries(${tester_target_name} mg-utils mg-communication) diff --git a/tests/integration/ha/term_updates/raft.json b/tests/integration/ha/term_updates/raft.json deleted file mode 100644 index ede2ddc62..000000000 --- a/tests/integration/ha/term_updates/raft.json +++ /dev/null @@ -1,7 +0,0 @@ -{ - "election_timeout_min": 750, - "election_timeout_max": 1000, - "heartbeat_interval": 100, - "replication_timeout": 10000, - "log_size_snapshot_threshold": -1 -} diff --git a/tests/integration/ha/term_updates/runner.py b/tests/integration/ha/term_updates/runner.py deleted file mode 100755 index 6f446ed16..000000000 --- a/tests/integration/ha/term_updates/runner.py +++ /dev/null @@ -1,131 +0,0 @@ -#!/usr/bin/python3 - -import argparse -import os -import time -import random -import subprocess -import shutil -import sys - -SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) -PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", "..", "..")) - -# append parent directory -sys.path.append(os.path.join(SCRIPT_DIR, "..")) - -from ha_test import HaTestBase - - -class HaTermUpdatesTest(HaTestBase): - def execute_step(self, step, expected_results=None): - if step == "create": - print("Executing create query") - client = subprocess.Popen([self.tester_binary, "--step", "create", - "--cluster_size", str(self.cluster_size)]) - - elif step == "count": - print("Executing count query") - client = subprocess.Popen([self.tester_binary, "--step", "count", - "--cluster_size", str(self.cluster_size), "--expected_results", - str(expected_results)]) - else: - raise ValueError("Invalid step argument: " + step) - - # Check what happened with query execution. - try: - code = client.wait(timeout=30) - except subprocess.TimeoutExpired as e: - print("Client timed out!") - client.kill() - return False - - return code == 0 - - def find_leader(self): - client = subprocess.run([self.tester_binary, - "--step", "find_leader", - "--cluster_size", str(self.cluster_size)], - stdout=subprocess.PIPE, check=True) - return int(client.stdout.decode('utf-8')) - 1 - - def execute(self): - self.start_cluster() # start a whole cluster from scratch - leader_id = self.find_leader() - follower_ids = list(set(range(self.cluster_size)) - {leader_id}) - - # Kill all followers. - for i in follower_ids: - print("Killing worker {}".format(i)) - self.kill_worker(i) - - # Try to execute a 'CREATE (n)' query on the leader. - # The query hangs because the leader doesn't have consensus. - assert not self.execute_step("create"), \ - "Error - a non-majorty cluster managed to execute a query" - - # Start a follower to create consensus so that the create succeeds. - print("Starting worker {}".format(follower_ids[0])) - self.start_worker(follower_ids[0]) - self.find_leader() # wait for leader re-election - assert self.execute_step("count", expected_results=1), \ - "Error while executing count query" - - # Kill the leader. - print("Killing leader (machine {})".format(leader_id)) - self.kill_worker(leader_id) - time.sleep(1) - - # Start the second follower to create a consensus with the first - # follower so that the first follower may become the new leader. - print("Starting worker {}".format(follower_ids[1])) - self.start_worker(follower_ids[1]) - self.find_leader() # wait for leader re-election - - # Verify that the data is there -> connect to the new leader and execute - # "MATCH (n) RETURN n" -> the data should be there. - assert self.execute_step("count", expected_results=1), \ - "Error while executing count query" - - # Try to assemble the whole cluster again by returning the old leader - # to the cluster as a fresh machine. - # (start the machine with its durability directory previously removed) - shutil.rmtree(self.get_durability_directory(leader_id)) - self.start_worker(leader_id) - self.find_leader() # wait for leader re-election - assert self.execute_step("count", expected_results=1), \ - "Error while executing count query" - - -def find_correct_path(path): - return os.path.join(PROJECT_DIR, "build", path) - - -if __name__ == "__main__": - memgraph_binary = find_correct_path("memgraph_ha") - tester_binary = find_correct_path(os.path.join("tests", "integration", "ha", - "term_updates", "tester")) - - raft_config_file = os.path.join(PROJECT_DIR, "tests", "integration", "ha", - "term_updates", "raft.json") - - parser = argparse.ArgumentParser() - parser.add_argument("--memgraph", default=memgraph_binary) - parser.add_argument("--raft_config_file", default=raft_config_file) - parser.add_argument("--username", default="") - parser.add_argument("--password", default="") - parser.add_argument("--encrypted", type=bool, default=False) - parser.add_argument("--address", type=str, - default="bolt://127.0.0.1") - parser.add_argument("--port", type=int, - default=7687) - args = parser.parse_args() - - cluster_size = 3 # test only works for 3 nodes - print("\033[1;36m~~ Executing test with cluster size: %d~~\033[0m" - % (cluster_size)) - HaTermUpdatesTest( - args.memgraph, tester_binary, args.raft_config_file, cluster_size) - print("\033[1;32m~~ The test finished successfully ~~\033[0m") - - sys.exit(0) diff --git a/tests/integration/ha/term_updates/tester.cpp b/tests/integration/ha/term_updates/tester.cpp deleted file mode 100644 index bfe98ed75..000000000 --- a/tests/integration/ha/term_updates/tester.cpp +++ /dev/null @@ -1,77 +0,0 @@ -#include -#include -#include - -#include -#include - -#include "communication/bolt/ha_client.hpp" -#include "io/network/endpoint.hpp" -#include "io/network/utils.hpp" -#include "utils/timer.hpp" - -DEFINE_string(address, "127.0.0.1", "Server address"); -DEFINE_int32(port, 7687, "Server port"); -DEFINE_int32(cluster_size, 3, "Size of the raft cluster."); -DEFINE_int32(expected_results, -1, "Number of expected nodes."); -DEFINE_int32(num_retries, 20, "Number of (leader) execution retries."); -DEFINE_string(username, "", "Username for the database"); -DEFINE_string(password, "", "Password for the database"); -DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server."); - -DEFINE_string(step, "create", "The step to execute (available: create, count)"); - -using namespace std::chrono_literals; - -int main(int argc, char **argv) { - gflags::ParseCommandLineFlags(&argc, &argv, true); - google::InitGoogleLogging(argv[0]); - - communication::SSLInit sslInit; - - try { - std::vector endpoints; - for (int i = 0; i < FLAGS_cluster_size; ++i) { - uint16_t port = FLAGS_port + i; - io::network::Endpoint endpoint{FLAGS_address, port}; - endpoints.push_back(endpoint); - } - - std::chrono::milliseconds retry_delay(1000); - communication::ClientContext context(FLAGS_use_ssl); - communication::bolt::HAClient client(endpoints, &context, FLAGS_username, - FLAGS_password, FLAGS_num_retries, - retry_delay); - - if (FLAGS_step == "create") { - client.Execute("create (:Node)", {}); - return 0; - - } else if (FLAGS_step == "count") { - auto result = client.Execute("match (n) return n", {}); - - if (result.records.size() != FLAGS_expected_results) { - LOG(WARNING) << "Unexpected number of nodes: " - << "expected " << FLAGS_expected_results << ", got " - << result.records.size(); - return 2; - } - return 0; - - } else if (FLAGS_step == "find_leader") { - std::cout << client.GetLeaderId(); - return 0; - - } else { - LOG(FATAL) << "Unexpected client step!"; - } - } catch (const communication::bolt::ClientQueryException &) { - LOG(WARNING) << "There was some transient error during query execution."; - } catch (const communication::bolt::ClientFatalException &) { - LOG(WARNING) << "Failed to communicate with the leader."; - } catch (const utils::BasicException &e) { - LOG(WARNING) << "Error while executing query."; - } - - return 1; -} diff --git a/tests/setup.sh b/tests/setup.sh new file mode 100755 index 000000000..e0120a9db --- /dev/null +++ b/tests/setup.sh @@ -0,0 +1,41 @@ +#!/bin/bash + +# shellcheck disable=1091 +set -Eeuo pipefail + +DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +PIP_DEPS=( + "behave==1.2.6" + "ldap3==2.6" + "neo4j-driver==4.1.1" + "parse==1.18.0" + "parse-type==0.5.2" + "pyyaml==5.3.1" + "six==1.15.0" +) +cd "$DIR" + +# Remove old virtualenv. +if [ -d ve3 ]; then + rm -rf ve3 +fi + +# Create new virtualenv. +virtualenv -p python3 ve3 +set +u +source "ve3/bin/activate" +set -u + +for pkg in "${PIP_DEPS[@]}"; do + pip --timeout 1000 install "$pkg" +done + +# Install mgclient from source becasue of full flexibility. +pushd "$DIR/../libs/pymgclient" > /dev/null +export MGCLIENT_INCLUDE_DIR="$DIR/../libs/mgclient/include" +export MGCLIENT_LIB_DIR="$DIR/../libs/mgclient/lib" +python3 setup.py build +python3 setup.py install +popd > /dev/null + +deactivate diff --git a/tools/src/mg_dump/main.cpp b/tools/src/mg_dump/main.cpp index 7457dd8bf..f8bf716b7 100644 --- a/tools/src/mg_dump/main.cpp +++ b/tools/src/mg_dump/main.cpp @@ -41,7 +41,7 @@ int main(int argc, char **argv) { mg_session_params_set_username(params, FLAGS_username.c_str()); mg_session_params_set_password(params, FLAGS_password.c_str()); } - mg_session_params_set_client_name(params, bolt_client_version.c_str()); + mg_session_params_set_user_agent(params, bolt_client_version.c_str()); mg_session_params_set_sslmode( params, FLAGS_use_ssl ? MG_SSLMODE_REQUIRE : MG_SSLMODE_DISABLE); @@ -56,7 +56,14 @@ int main(int argc, char **argv) { return 1; } - if (mg_session_run(session, "DUMP DATABASE", nullptr, nullptr) < 0) { + if (mg_session_run(session, "DUMP DATABASE", nullptr, nullptr, nullptr, + nullptr) < 0) { + std::cerr << "Execution failed: " << mg_session_error(session) << std::endl; + mg_session_destroy(session); + return 1; + } + + if (mg_session_pull(session, nullptr) < 0) { std::cerr << "Execution failed: " << mg_session_error(session) << std::endl; mg_session_destroy(session); return 1; @@ -64,7 +71,7 @@ int main(int argc, char **argv) { // Fetch results mg_result *result; - while ((status = mg_session_pull(session, &result)) == 1) { + while ((status = mg_session_fetch(session, &result)) == 1) { const mg_list *row = mg_result_row(result); CHECK(mg_list_size(row) == 1) << "Error: dump client received data in unexpected format";