From 7e99e93e473b043f7635c8f3d3948c0e8feb9240 Mon Sep 17 00:00:00 2001 From: Mislav Bradac <mislav.bradac@memgraph.io> Date: Tue, 12 Sep 2017 15:25:43 +0200 Subject: [PATCH] Start work on parallel benchmark Summary: First version of our benchmark Reviewers: florijan, buda Reviewed By: florijan Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D740 --- .ycm_extra_conf.py | 3 +- config/benchmarking_throughput.conf | 37 ++ libs/setup.sh | 8 + src/communication/bolt/client.hpp | 9 +- src/io/network/addrinfo.cpp | 8 +- src/io/network/addrinfo.hpp | 8 +- src/io/network/network_endpoint.cpp | 36 +- src/io/network/network_endpoint.hpp | 16 +- src/io/network/socket.cpp | 64 +-- src/io/network/socket.hpp | 30 +- tests/macro_benchmark/CMakeLists.txt | 7 + tests/macro_benchmark/harness/clients.py | 136 +++++ .../harness/clients/common.hpp | 15 +- .../harness/clients/long_running_client.cpp | 267 +++++++++ .../harness/clients/postgres_client.hpp | 8 +- .../{harness_client.cpp => query_client.cpp} | 39 +- tests/macro_benchmark/harness/common.py | 19 + .../macro_benchmark/harness/config/neo4j.conf | 1 - .../harness/config/neo4j_long_running.conf | 324 +++++++++++ tests/macro_benchmark/harness/databases.py | 169 ++++++ .../harness/groups/pokec/.gitignore | 1 + .../groups/pokec/pokec_small.config.json | 3 + .../harness/groups/pokec/pokec_small.run.json | 28 + tests/macro_benchmark/harness/harness.py | 535 ++---------------- tests/macro_benchmark/harness/jail_faker.py | 27 +- .../harness/long_running_suite.py | 129 +++++ tests/macro_benchmark/harness/perf.py | 32 -- tests/macro_benchmark/harness/query_suite.py | 209 +++++++ .../harness/results/.gitignore | 2 - tools/apollo/build_debug | 2 +- tools/apollo/build_diff | 4 +- 31 files changed, 1530 insertions(+), 646 deletions(-) create mode 100644 config/benchmarking_throughput.conf create mode 100644 tests/macro_benchmark/harness/clients.py create mode 100644 tests/macro_benchmark/harness/clients/long_running_client.cpp rename tests/macro_benchmark/harness/clients/{harness_client.cpp => query_client.cpp} (77%) create mode 100644 tests/macro_benchmark/harness/common.py create mode 100644 tests/macro_benchmark/harness/config/neo4j_long_running.conf create mode 100644 tests/macro_benchmark/harness/databases.py create mode 100644 tests/macro_benchmark/harness/groups/pokec/.gitignore create mode 100644 tests/macro_benchmark/harness/groups/pokec/pokec_small.config.json create mode 100644 tests/macro_benchmark/harness/groups/pokec/pokec_small.run.json create mode 100644 tests/macro_benchmark/harness/long_running_suite.py delete mode 100644 tests/macro_benchmark/harness/perf.py create mode 100644 tests/macro_benchmark/harness/query_suite.py delete mode 100644 tests/macro_benchmark/harness/results/.gitignore diff --git a/.ycm_extra_conf.py b/.ycm_extra_conf.py index 7c08015e1..d8651ea40 100644 --- a/.ycm_extra_conf.py +++ b/.ycm_extra_conf.py @@ -30,7 +30,8 @@ BASE_FLAGS = [ '-I./libs/antlr4/runtime/Cpp/runtime/src', '-I./build/libs/gflags/include', '-I./experimental/distributed/src', - '-I./experimental/distributed/libs/cereal/include' + '-I./experimental/distributed/libs/cereal/include', + '-I./libs/postgresql/include' ] SOURCE_EXTENSIONS = [ diff --git a/config/benchmarking_throughput.conf b/config/benchmarking_throughput.conf new file mode 100644 index 000000000..357d93b53 --- /dev/null +++ b/config/benchmarking_throughput.conf @@ -0,0 +1,37 @@ +# MEMGRAPH DEFAULT BENCHMARKING CONFIG + +# NOTE: all paths are relative to the run folder +# (where the executable is run) + +# directory to the codes which will be compiled +--compile-directory=compiled + +# path to the template (cpp) for codes generation +--template-cpp-path=template/plan_template_cpp + +# directory to the folder with snapshots +--snapshot-directory=snapshots + +# snapshot cycle interval +# if set to -1 the snapshooter will not run +--snapshot-cycle-sec=-1 + +# snapshot cycle interval +# if set to -1 the snapshooter will not run +--query_execution_time_sec=-1 + +# create snapshot disabled on db exit +--snapshot-on-db-exit=false + +# max number of snapshots which will be kept on the disk at some point +# if set to -1 the max number of snapshots is unlimited +--max-retained-snapshots=-1 + +# by default query engine runs in interpret mode +--interpret=true + +# database recovering is disabled by default +--recover-on-startup=false + +# use ast caching +--ast-cache=true diff --git a/libs/setup.sh b/libs/setup.sh index 97bee702e..61b794324 100755 --- a/libs/setup.sh +++ b/libs/setup.sh @@ -100,3 +100,11 @@ wget -nv http://deps.memgraph.io/postgresql-9.6.5-1-linux-x64-binaries.tar.gz -O tar -xzf postgres.tar.gz mv pgsql postgresql rm postgres.tar.gz + +# nlohmann json +# We wget header instead of cloning repo since repo is huge (lots of test data). +# We use head on Sep 1, 2017 instead of last release since it was long time ago. +mkdir json +cd json +wget "https://raw.githubusercontent.com/nlohmann/json/91e003285312167ad8365f387438ea371b465a7e/src/json.hpp" +cd .. diff --git a/src/communication/bolt/client.hpp b/src/communication/bolt/client.hpp index d105adef2..a8d34617e 100644 --- a/src/communication/bolt/client.hpp +++ b/src/communication/bolt/client.hpp @@ -46,8 +46,9 @@ struct QueryData { template <typename Socket> class Client { public: - Client(Socket &&socket, std::string &username, std::string &password, - std::string client_name = "") + Client(Socket &&socket, const std::string &username, + const std::string &password, + const std::string &client_name = "memgraph-bolt/0.0.1") : socket_(std::move(socket)) { DLOG(INFO) << "Sending handshake"; if (!socket_.Write(kPreamble, sizeof(kPreamble))) { @@ -68,10 +69,6 @@ class Client { } buffer_.Shift(sizeof(kProtocol)); - if (client_name == "") { - client_name = "memgraph-bolt/0.0.1"; - } - DLOG(INFO) << "Sending init message"; if (!encoder_.MessageInit(client_name, {{"scheme", "basic"}, {"principal", username}, diff --git a/src/io/network/addrinfo.cpp b/src/io/network/addrinfo.cpp index 28a954324..0dd92449a 100644 --- a/src/io/network/addrinfo.cpp +++ b/src/io/network/addrinfo.cpp @@ -7,11 +7,11 @@ namespace io::network { -AddrInfo::AddrInfo(struct addrinfo* info) : info(info) {} +AddrInfo::AddrInfo(struct addrinfo *info) : info(info) {} AddrInfo::~AddrInfo() { freeaddrinfo(info); } -AddrInfo AddrInfo::Get(const char* addr, const char* port) { +AddrInfo AddrInfo::Get(const char *addr, const char *port) { struct addrinfo hints; memset(&hints, 0, sizeof(struct addrinfo)); @@ -19,7 +19,7 @@ AddrInfo AddrInfo::Get(const char* addr, const char* port) { hints.ai_socktype = SOCK_STREAM; // TCP socket hints.ai_flags = AI_PASSIVE; - struct addrinfo* result; + struct addrinfo *result; auto status = getaddrinfo(addr, port, &hints, &result); if (status != 0) throw NetworkError(gai_strerror(status)); @@ -27,5 +27,5 @@ AddrInfo AddrInfo::Get(const char* addr, const char* port) { return AddrInfo(result); } -AddrInfo::operator struct addrinfo*() { return info; } +AddrInfo::operator struct addrinfo *() { return info; } } diff --git a/src/io/network/addrinfo.hpp b/src/io/network/addrinfo.hpp index ae082bd11..cd749bf9d 100644 --- a/src/io/network/addrinfo.hpp +++ b/src/io/network/addrinfo.hpp @@ -7,16 +7,16 @@ namespace io::network { * see: man 3 getaddrinfo */ class AddrInfo { - AddrInfo(struct addrinfo* info); + AddrInfo(struct addrinfo *info); public: ~AddrInfo(); - static AddrInfo Get(const char* addr, const char* port); + static AddrInfo Get(const char *addr, const char *port); - operator struct addrinfo*(); + operator struct addrinfo *(); private: - struct addrinfo* info; + struct addrinfo *info; }; } diff --git a/src/io/network/network_endpoint.cpp b/src/io/network/network_endpoint.cpp index 218dcd874..a8e0441f4 100644 --- a/src/io/network/network_endpoint.cpp +++ b/src/io/network/network_endpoint.cpp @@ -11,7 +11,7 @@ NetworkEndpoint::NetworkEndpoint() : port_(0), family_(0) { memset(port_str_, 0, sizeof port_str_); } -NetworkEndpoint::NetworkEndpoint(const char* addr, const char* port) { +NetworkEndpoint::NetworkEndpoint(const char *addr, const char *port) { if (addr == nullptr) throw NetworkEndpointException("Address can't be null!"); if (port == nullptr) throw NetworkEndpointException("Port can't be null!"); @@ -19,27 +19,6 @@ NetworkEndpoint::NetworkEndpoint(const char* addr, const char* port) { snprintf(address_, sizeof address_, "%s", addr); snprintf(port_str_, sizeof port_str_, "%s", port); - is_address_valid(); - - int ret = sscanf(port, "%hu", &port_); - if (ret != 1) throw NetworkEndpointException("Port isn't valid!"); -} - -NetworkEndpoint::NetworkEndpoint(const std::string& addr, - const std::string& port) - : NetworkEndpoint(addr.c_str(), port.c_str()) {} - -NetworkEndpoint::NetworkEndpoint(const char* addr, unsigned short port) { - if (addr == nullptr) throw NetworkEndpointException("Address can't be null!"); - - snprintf(address_, sizeof address_, "%s", addr); - snprintf(port_str_, sizeof port_str_, "%hu", port); - port_ = port; - - is_address_valid(); -} - -void NetworkEndpoint::is_address_valid() { in_addr addr4; in6_addr addr6; int ret = inet_pton(AF_INET, address_, &addr4); @@ -52,10 +31,15 @@ void NetworkEndpoint::is_address_valid() { family_ = 6; } else family_ = 4; + + ret = sscanf(port, "%hu", &port_); + if (ret != 1) throw NetworkEndpointException("Port isn't valid!"); } -const char* NetworkEndpoint::address() { return address_; } -const char* NetworkEndpoint::port_str() { return port_str_; } -unsigned short NetworkEndpoint::port() { return port_; } -unsigned char NetworkEndpoint::family() { return family_; } +NetworkEndpoint::NetworkEndpoint(const std::string &addr, + const std::string &port) + : NetworkEndpoint(addr.c_str(), port.c_str()) {} + +NetworkEndpoint::NetworkEndpoint(const std::string &addr, unsigned short port) + : NetworkEndpoint(addr.c_str(), std::to_string(port)) {} } diff --git a/src/io/network/network_endpoint.hpp b/src/io/network/network_endpoint.hpp index 506527a30..35f91534a 100644 --- a/src/io/network/network_endpoint.hpp +++ b/src/io/network/network_endpoint.hpp @@ -20,18 +20,16 @@ class NetworkEndpointException : public utils::BasicException { class NetworkEndpoint { public: NetworkEndpoint(); - NetworkEndpoint(const char* addr, const char* port); - NetworkEndpoint(const char* addr, unsigned short port); - NetworkEndpoint(const std::string& addr, const std::string& port); + NetworkEndpoint(const std::string &addr, const std::string &port); + NetworkEndpoint(const char *addr, const char *port); + NetworkEndpoint(const std::string &addr, unsigned short port); - const char* address(); - const char* port_str(); - unsigned short port(); - unsigned char family(); + const char *address() const { return address_; } + const char *port_str() const { return port_str_; } + int port() const { return port_; } + unsigned char family() const { return family_; } private: - void is_address_valid(); - char address_[INET6_ADDRSTRLEN]; char port_str_[6]; unsigned short port_; diff --git a/src/io/network/socket.cpp b/src/io/network/socket.cpp index aea432c39..43390b736 100644 --- a/src/io/network/socket.cpp +++ b/src/io/network/socket.cpp @@ -10,10 +10,10 @@ #include <errno.h> #include <fcntl.h> #include <netdb.h> -#include <sys/epoll.h> -#include <sys/socket.h> #include <netinet/in.h> #include <netinet/tcp.h> +#include <sys/epoll.h> +#include <sys/socket.h> #include <sys/types.h> #include <unistd.h> @@ -24,14 +24,14 @@ namespace io::network { Socket::Socket() : socket_(-1) {} -Socket::Socket(int sock, NetworkEndpoint& endpoint) +Socket::Socket(int sock, const NetworkEndpoint &endpoint) : socket_(sock), endpoint_(endpoint) {} -Socket::Socket(const Socket& s) : socket_(s.id()) {} +Socket::Socket(const Socket &s) : socket_(s.id()) {} -Socket::Socket(Socket&& other) { *this = std::forward<Socket>(other); } +Socket::Socket(Socket &&other) { *this = std::forward<Socket>(other); } -Socket& Socket::operator=(Socket&& other) { +Socket &Socket::operator=(Socket &&other) { socket_ = other.socket_; endpoint_ = other.endpoint_; other.socket_ = -1; @@ -51,12 +51,12 @@ void Socket::Close() { bool Socket::IsOpen() { return socket_ != -1; } -bool Socket::Connect(NetworkEndpoint& endpoint) { +bool Socket::Connect(const NetworkEndpoint &endpoint) { if (UNLIKELY(socket_ != -1)) return false; auto info = AddrInfo::Get(endpoint.address(), endpoint.port_str()); - for (struct addrinfo* it = info; it != nullptr; it = it->ai_next) { + for (struct addrinfo *it = info; it != nullptr; it = it->ai_next) { int sfd = socket(it->ai_family, it->ai_socktype, it->ai_protocol); if (sfd == -1) continue; if (connect(sfd, it->ai_addr, it->ai_addrlen) == 0) { @@ -70,12 +70,12 @@ bool Socket::Connect(NetworkEndpoint& endpoint) { return true; } -bool Socket::Bind(NetworkEndpoint& endpoint) { +bool Socket::Bind(const NetworkEndpoint &endpoint) { if (UNLIKELY(socket_ != -1)) return false; auto info = AddrInfo::Get(endpoint.address(), endpoint.port_str()); - for (struct addrinfo* it = info; it != nullptr; it = it->ai_next) { + for (struct addrinfo *it = info; it != nullptr; it = it->ai_next) { int sfd = socket(it->ai_family, it->ai_socktype, it->ai_protocol); if (sfd == -1) continue; @@ -94,7 +94,7 @@ bool Socket::Bind(NetworkEndpoint& endpoint) { // detect bound port, used when the server binds to a random port struct sockaddr_in6 portdata; socklen_t portdatalen = sizeof(portdata); - if (getsockname(socket_, (struct sockaddr *) &portdata, &portdatalen) < 0) { + if (getsockname(socket_, (struct sockaddr *)&portdata, &portdatalen) < 0) { return false; } @@ -122,16 +122,16 @@ bool Socket::SetKeepAlive() { if (setsockopt(socket_, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen) < 0) return false; - optval = 20; // wait 120s before seding keep-alive packets - if (setsockopt(socket_, SOL_TCP, TCP_KEEPIDLE, (void*)&optval, optlen) < 0) + optval = 20; // wait 120s before seding keep-alive packets + if (setsockopt(socket_, SOL_TCP, TCP_KEEPIDLE, (void *)&optval, optlen) < 0) return false; - optval = 4; // 4 keep-alive packets must fail to close - if (setsockopt(socket_, SOL_TCP, TCP_KEEPCNT, (void*)&optval, optlen) < 0) + optval = 4; // 4 keep-alive packets must fail to close + if (setsockopt(socket_, SOL_TCP, TCP_KEEPCNT, (void *)&optval, optlen) < 0) return false; - optval = 15; // send keep-alive packets every 15s - if (setsockopt(socket_, SOL_TCP, TCP_KEEPINTVL, (void*)&optval, optlen) < 0) + optval = 15; // send keep-alive packets every 15s + if (setsockopt(socket_, SOL_TCP, TCP_KEEPINTVL, (void *)&optval, optlen) < 0) return false; return true; @@ -141,7 +141,7 @@ bool Socket::SetNoDelay() { int optval = 1; socklen_t optlen = sizeof(optval); - if (setsockopt(socket_, SOL_TCP, TCP_NODELAY, (void*)&optval, optlen) < 0) + if (setsockopt(socket_, SOL_TCP, TCP_NODELAY, (void *)&optval, optlen) < 0) return false; return true; @@ -163,24 +163,24 @@ bool Socket::SetTimeout(long sec, long usec) { bool Socket::Listen(int backlog) { return listen(socket_, backlog) == 0; } -bool Socket::Accept(Socket* s) { +bool Socket::Accept(Socket *s) { sockaddr_storage addr; socklen_t addr_size = sizeof addr; char addr_decoded[INET6_ADDRSTRLEN]; - void* addr_src; + void *addr_src; unsigned short port; unsigned char family; - int sfd = accept(socket_, (struct sockaddr*)&addr, &addr_size); + int sfd = accept(socket_, (struct sockaddr *)&addr, &addr_size); if (UNLIKELY(sfd == -1)) return false; if (addr.ss_family == AF_INET) { - addr_src = (void*)&(((sockaddr_in*)&addr)->sin_addr); - port = ntohs(((sockaddr_in*)&addr)->sin_port); + addr_src = (void *)&(((sockaddr_in *)&addr)->sin_addr); + port = ntohs(((sockaddr_in *)&addr)->sin_port); family = 4; } else { - addr_src = (void*)&(((sockaddr_in6*)&addr)->sin6_addr); - port = ntohs(((sockaddr_in6*)&addr)->sin6_port); + addr_src = (void *)&(((sockaddr_in6 *)&addr)->sin6_addr); + port = ntohs(((sockaddr_in6 *)&addr)->sin6_port); family = 6; } @@ -189,7 +189,7 @@ bool Socket::Accept(Socket* s) { NetworkEndpoint endpoint; try { endpoint = NetworkEndpoint(addr_decoded, port); - } catch (NetworkEndpointException& e) { + } catch (NetworkEndpointException &e) { return false; } @@ -201,17 +201,17 @@ bool Socket::Accept(Socket* s) { Socket::operator int() { return socket_; } int Socket::id() const { return socket_; } -NetworkEndpoint& Socket::endpoint() { return endpoint_; } +const NetworkEndpoint &Socket::endpoint() const { return endpoint_; } -bool Socket::Write(const std::string& str) { +bool Socket::Write(const std::string &str) { return Write(str.c_str(), str.size()); } -bool Socket::Write(const char* data, size_t len) { - return Write(reinterpret_cast<const uint8_t*>(data), len); +bool Socket::Write(const char *data, size_t len) { + return Write(reinterpret_cast<const uint8_t *>(data), len); } -bool Socket::Write(const uint8_t* data, size_t len) { +bool Socket::Write(const uint8_t *data, size_t len) { while (len > 0) { // MSG_NOSIGNAL is here to disable raising a SIGPIPE // signal when a connection dies mid-write, the socket @@ -224,7 +224,7 @@ bool Socket::Write(const uint8_t* data, size_t len) { return true; } -int Socket::Read(void* buffer, size_t len) { +int Socket::Read(void *buffer, size_t len) { return read(socket_, buffer, len); } } diff --git a/src/io/network/socket.hpp b/src/io/network/socket.hpp index 3fa02ed2b..730ae479c 100644 --- a/src/io/network/socket.hpp +++ b/src/io/network/socket.hpp @@ -15,9 +15,9 @@ namespace io::network { class Socket { public: Socket(); - Socket(const Socket& s); - Socket(Socket&& other); - Socket& operator=(Socket&& other); + Socket(const Socket &s); + Socket(Socket &&other); + Socket &operator=(Socket &&other); ~Socket(); /** @@ -43,7 +43,7 @@ class Socket { * true if the connect succeeded * false if the connect failed */ - bool Connect(NetworkEndpoint& endpoint); + bool Connect(const NetworkEndpoint &endpoint); /** * Binds the socket to the specified endpoint. @@ -54,12 +54,13 @@ class Socket { * true if the bind succeeded * false if the bind failed */ - bool Bind(NetworkEndpoint& endpoint); + bool Bind(const NetworkEndpoint &endpoint); /** * Start listening on the bound socket. * - * @param backlog maximum number of pending connections in the connection queue + * @param backlog maximum number of pending connections in the connection + * queue * * @return listen success status: * true if the listen succeeded @@ -74,10 +75,11 @@ class Socket { * @param s Socket object that will be instantiated with the new connection * * @return accept success status: - * true if a new connection was accepted and the socket 's' was instantiated + * true if a new connection was accepted and the socket 's' was + * instantiated * false if a new connection accept failed */ - bool Accept(Socket* s); + bool Accept(Socket *s); /** * Sets the socket to non-blocking. @@ -132,7 +134,7 @@ class Socket { /** * Returns the currently active endpoint of the socket. */ - NetworkEndpoint& endpoint(); + const NetworkEndpoint &endpoint() const; /** * Write data to the socket. @@ -146,9 +148,9 @@ class Socket { * true if write succeeded * false if write failed */ - bool Write(const std::string& str); - bool Write(const char* data, size_t len); - bool Write(const uint8_t* data, size_t len); + bool Write(const std::string &str); + bool Write(const char *data, size_t len); + bool Write(const uint8_t *data, size_t len); /** * Read data from the socket. @@ -162,10 +164,10 @@ class Socket { * == 0 if the client closed the connection * < 0 if an error has occurred */ - int Read(void* buffer, size_t len); + int Read(void *buffer, size_t len); private: - Socket(int sock, NetworkEndpoint& endpoint); + Socket(int sock, const NetworkEndpoint &endpoint); int socket_; NetworkEndpoint endpoint_; diff --git a/tests/macro_benchmark/CMakeLists.txt b/tests/macro_benchmark/CMakeLists.txt index 31d06c37f..2ec5a9a1a 100644 --- a/tests/macro_benchmark/CMakeLists.txt +++ b/tests/macro_benchmark/CMakeLists.txt @@ -10,6 +10,10 @@ message(STATUS "Available ${test_type} cpp files are: ${test_type_cpps}") # postgres directory set(postgres_dir ${libs_dir}/postgresql) +# add target that depends on all other targets +set(all_targets_target ${project_name}__${test_type}) +add_custom_target(${all_targets_target}) + # for each cpp file build binary and register test foreach(test_cpp ${test_type_cpps}) @@ -35,4 +39,7 @@ foreach(test_cpp ${test_type_cpps}) target_link_libraries(${target_name} "${postgres_dir}/lib/libpq.so") target_include_directories(${target_name} PUBLIC "${postgres_dir}/include") + # add target to dependencies + add_dependencies(${all_targets_target} ${target_name}) + endforeach() diff --git a/tests/macro_benchmark/harness/clients.py b/tests/macro_benchmark/harness/clients.py new file mode 100644 index 000000000..f61f2a7d9 --- /dev/null +++ b/tests/macro_benchmark/harness/clients.py @@ -0,0 +1,136 @@ +import logging +import os +import time +import json +import tempfile +from common import get_absolute_path, WALL_TIME, CPU_TIME + +log = logging.getLogger(__name__) + +try: + import jail + APOLLO = True +except: + import jail_faker as jail + APOLLO = False + + +# This could be a function, not a class, but we want to reuse jail process since +# we can instantiate only 8 of them. +class QueryClient: + def __init__(self, args, cpus=None): + self.log = logging.getLogger("QueryClient") + self.client = jail.get_process() + if cpus: + self.client.set_cpus(cpus) + + def __call__(self, queries, database, num_client_workers): + self.log.debug("execute('%s')", str(queries)) + + client_path = "tests/macro_benchmark/query_client" + client = get_absolute_path(client_path, "build") + if not os.path.exists(client): + # Apollo builds both debug and release binaries on diff + # so we need to use the release client if the debug one + # doesn't exist + client = get_absolute_path(client_path, "build_release") + + queries_fd, queries_path = tempfile.mkstemp() + try: + queries_file = os.fdopen(queries_fd, "w") + queries_file.write("\n".join(queries)) + queries_file.close() + except: + queries_file.close() + os.remove(queries_path) + raise Exception("Writing queries to temporary file failed") + + output_fd, output = tempfile.mkstemp() + os.close(output_fd) + + client_args = ["--port", database.args.port, + "--num-workers", str(num_client_workers), + "--output", output] + + cpu_time_start = database.database_bin.get_usage()["cpu"] + # TODO make the timeout configurable per query or something + return_code = self.client.run_and_wait( + client, client_args, timeout=600, stdin=queries_path) + cpu_time_end = database.database_bin.get_usage()["cpu"] + os.remove(queries_path) + if return_code != 0: + with open(self.client.get_stderr()) as f: + stderr = f.read() + self.log.error("Error while executing queries '%s'. " + "Failed with return_code %d and stderr:\n%s", + str(queries), return_code, stderr) + raise Exception("BoltClient execution failed") + + with open(output) as f: + data = json.loads(f.read()) + data[CPU_TIME] = cpu_time_end - cpu_time_start + + os.remove(output) + return data + + +class LongRunningClient: + def __init__(self, args, cpus=None): + self.log = logging.getLogger("LongRunningClient") + self.client = jail.get_process() + if cpus: + self.client.set_cpus(cpus) + + # TODO: This is quite similar to __call__ method of QueryClient. Remove + # duplication. + def __call__(self, config, database, duration, num_client_workers): + self.log.debug("execute('%s')", config) + + client_path = "tests/macro_benchmark/long_running_client" + client = get_absolute_path(client_path, "build") + if not os.path.exists(client): + # Apollo builds both debug and release binaries on diff + # so we need to use the release client if the debug one + # doesn't exist + client = get_absolute_path(client_path, "build_release") + + config_fd, config_path = tempfile.mkstemp() + try: + config_file = os.fdopen(config_fd, "w") + print(json.dumps(config, indent=4), file=config_file) + config_file.close() + except: + config_file.close() + os.remove(config_path) + raise Exception("Writing config to temporary file failed") + + output_fd, output = tempfile.mkstemp() + os.close(output_fd) + + client_args = ["--port", database.args.port, + "--num-workers", str(num_client_workers), + "--output", output, + "--duration", str(duration)] + + return_code = self.client.run_and_wait( + client, client_args, timeout=600, stdin=config_path) + os.remove(config_path) + if return_code != 0: + with open(self.client.get_stderr()) as f: + stderr = f.read() + self.log.error("Error while executing config '%s'. " + "Failed with return_code %d and stderr:\n%s", + str(config), return_code, stderr) + raise Exception("BoltClient execution failed") + + + # TODO: We shouldn't wait for process to finish to start reading output. + # We should implement periodic reading of data and stream data when it + # becomes available. + data = [] + with open(output) as f: + for line in f: + data.append(json.loads(line)) + + os.remove(output) + return data diff --git a/tests/macro_benchmark/harness/clients/common.hpp b/tests/macro_benchmark/harness/clients/common.hpp index 3b9e7dcda..56aca1b71 100644 --- a/tests/macro_benchmark/harness/clients/common.hpp +++ b/tests/macro_benchmark/harness/clients/common.hpp @@ -1,7 +1,10 @@ +#include <experimental/optional> +#include <map> #include <string> #include "communication/bolt/client.hpp" #include "communication/bolt/v1/decoder/decoded_value.hpp" +#include "utils/exceptions.hpp" namespace { @@ -47,15 +50,17 @@ void PrintJsonDecodedValue(std::ostream &os, } } -template <typename ClientT, typename ExceptionT> +template <typename TClient> communication::bolt::QueryData ExecuteNTimesTillSuccess( - ClientT &client, const std::string &query, int times) { - ExceptionT last_exception; + TClient &client, const std::string &query, + const std::map<std::string, communication::bolt::DecodedValue> ¶ms, + int times) { + std::experimental::optional<utils::BasicException> last_exception; for (int i = 0; i < times; ++i) { try { - auto ret = client.Execute(query, {}); + auto ret = client.Execute(query, params); return ret; - } catch (const ExceptionT &e) { + } catch (const utils::BasicException &e) { last_exception = e; } } diff --git a/tests/macro_benchmark/harness/clients/long_running_client.cpp b/tests/macro_benchmark/harness/clients/long_running_client.cpp new file mode 100644 index 000000000..6084cce38 --- /dev/null +++ b/tests/macro_benchmark/harness/clients/long_running_client.cpp @@ -0,0 +1,267 @@ +// TODO: work in progress. +#include <array> +#include <chrono> +#include <fstream> +#include <iostream> +#include <queue> +#include <random> +#include <sstream> +#include <vector> + +#include <gflags/gflags.h> +#include <glog/logging.h> +#include <json/json.hpp> + +#include "common.hpp" +#include "communication/bolt/client.hpp" +#include "communication/bolt/v1/decoder/decoded_value.hpp" +#include "io/network/network_endpoint.hpp" +#include "io/network/socket.hpp" +#include "threading/sync/spinlock.hpp" +#include "utils/algorithm.hpp" +#include "utils/algorithm.hpp" +#include "utils/assert.hpp" +#include "utils/timer.hpp" + +using SocketT = io::network::Socket; +using EndpointT = io::network::NetworkEndpoint; +using Client = communication::bolt::Client<SocketT>; +using communication::bolt::DecodedValue; +using communication::bolt::DecodedVertex; +using communication::bolt::DecodedEdge; + +DEFINE_string(address, "127.0.0.1", "Server address"); +DEFINE_string(port, "7687", "Server port"); +DEFINE_int32(num_workers, 1, "Number of workers"); +DEFINE_string(output, "", "Output file"); +DEFINE_string(username, "", "Username for the database"); +DEFINE_string(password, "", "Password for the database"); +DEFINE_int32(duration, 30, "Number of seconds to execute benchmark"); + +const int MAX_RETRIES = 30; +const int NUM_BUCKETS = 100; + +struct VertexAndEdges { + DecodedVertex vertex; + std::vector<DecodedEdge> edges; + std::vector<DecodedVertex> vertices; +}; + +std::pair<VertexAndEdges, int> DetachDeleteVertex(Client &client, + const std::string &label, + int64_t id) { + auto records = + ExecuteNTimesTillSuccess( + client, "MATCH (n :" + label + " {id : $id})-[e]-(m) RETURN n, e, m", + std::map<std::string, DecodedValue>{{"id", id}}, MAX_RETRIES) + .records; + + if (records.size() == 0U) return {{}, 1}; + + ExecuteNTimesTillSuccess( + client, "MATCH (n :" + label + " {id : $id})-[]-(m) DETACH DELETE n", + std::map<std::string, DecodedValue>{{"id", id}}, MAX_RETRIES); + + std::vector<DecodedEdge> edges; + edges.reserve(records.size()); + for (const auto &record : records) { + edges.push_back(record[1].ValueEdge()); + } + + std::vector<DecodedVertex> vertices; + vertices.reserve(records.size()); + for (const auto &record : records) { + vertices.push_back(record[2].ValueVertex()); + } + + return {{records[0][0].ValueVertex(), edges, vertices}, 2}; +} + +int ReturnVertexAndEdges(Client &client, const VertexAndEdges &vertex_and_edges, + const std::string &independent_label) { + int num_queries = 0; + { + std::stringstream os; + os << "CREATE (n :"; + PrintIterable(os, vertex_and_edges.vertex.labels, ":"); + os << " {"; + PrintIterable(os, vertex_and_edges.vertex.properties, ", ", + [&](auto &stream, const auto &pair) { + if (pair.second.type() == DecodedValue::Type::String) { + stream << pair.first << ": \"" << pair.second << "\""; + } else { + stream << pair.first << ": " << pair.second; + } + }); + os << "})"; + ExecuteNTimesTillSuccess(client, os.str(), {}, MAX_RETRIES); + ++num_queries; + } + + for (int i = 0; i < static_cast<int>(vertex_and_edges.vertices.size()); ++i) { + std::stringstream os; + os << "MATCH (n :" << independent_label + << " {id: " << vertex_and_edges.vertex.properties.at("id") << "}) "; + os << "MATCH (m :" << independent_label + << " {id: " << vertex_and_edges.vertices[i].properties.at("id") << "}) "; + const auto &edge = vertex_and_edges.edges[i]; + os << "CREATE (n)"; + if (edge.to == vertex_and_edges.vertex.id) { + os << "<-"; + } else { + os << "-"; + } + os << "[:" << edge.type << " {"; + PrintIterable(os, edge.properties, ", ", + [&](auto &stream, const auto &pair) { + if (pair.second.type() == DecodedValue::Type::String) { + stream << pair.first << ": \"" << pair.second << "\""; + } else { + stream << pair.first << ": " << pair.second; + } + }); + os << "}]"; + if (edge.from == vertex_and_edges.vertex.id) { + os << "->"; + } else { + os << "-"; + } + os << "(m)"; + os << " RETURN n.id"; + auto ret = ExecuteNTimesTillSuccess(client, os.str(), {}, MAX_RETRIES); + auto x = ret.metadata["plan_execution_time"]; + auto y = ret.metadata["planning_time"]; + if (x.type() == DecodedValue::Type::Double) { + LOG_EVERY_N(INFO, 5000) << "exec " << x.ValueDouble() << " planning " + << y.ValueDouble(); + CHECK(ret.records.size() == 1U) << "Graph in invalid state"; + } + ++num_queries; + } + return num_queries; +} + +int main(int argc, char **argv) { + gflags::ParseCommandLineFlags(&argc, &argv, true); + google::InitGoogleLogging(argv[0]); + + nlohmann::json config; + std::cin >> config; + const auto &queries = config["queries"]; + const double read_probability = config["read_probability"]; + const int64_t num_independent_nodes = config["num_independent_nodes"]; + const std::string independent_label = config["independent_label"]; + const int64_t num_nodes = config["num_nodes"]; + + utils::Timer timer; + std::vector<std::thread> threads; + std::atomic<int64_t> executed_queries{0}; + std::atomic<bool> keep_running{true}; + + for (int i = 0; i < FLAGS_num_workers; ++i) { + threads.emplace_back( + [&](int thread_id) { + // Initialise client. + SocketT socket; + EndpointT endpoint; + try { + endpoint = EndpointT(FLAGS_address, FLAGS_port); + } catch (const io::network::NetworkEndpointException &e) { + LOG(FATAL) << "Invalid address or port: " << FLAGS_address << ":" + << FLAGS_port; + } + if (!socket.Connect(endpoint)) { + LOG(FATAL) << "Could not connect to: " << FLAGS_address << ":" + << FLAGS_port; + } + Client client(std::move(socket), FLAGS_username, FLAGS_password); + + std::mt19937 random_gen(thread_id); + int64_t to_remove = + num_independent_nodes / FLAGS_num_workers * thread_id + 1; + int64_t last_to_remove = + to_remove + num_independent_nodes / FLAGS_num_workers; + bool remove = true; + int64_t num_shifts = 0; + std::vector<VertexAndEdges> removed; + + while (keep_running) { + std::uniform_real_distribution<> real_dist(0.0, 1.0); + + // Read query. + if (real_dist(random_gen) < read_probability) { + std::uniform_int_distribution<> read_query_dist( + 0, static_cast<int>(queries.size()) - 1); + const auto &query = queries[read_query_dist(random_gen)]; + std::map<std::string, DecodedValue> params; + for (const auto ¶m : query["params"]) { + std::uniform_int_distribution<int64_t> param_value_dist( + param["low"], param["high"]); + params[param["name"]] = param_value_dist(random_gen); + } + ExecuteNTimesTillSuccess(client, query["query"], params, + MAX_RETRIES); + ++executed_queries; + } else { + if (!remove) { + executed_queries += ReturnVertexAndEdges(client, removed.back(), + independent_label); + removed.pop_back(); + if (removed.empty()) { + remove = true; + } + } else { + auto ret = + DetachDeleteVertex(client, independent_label, to_remove); + ++to_remove; + executed_queries += ret.second; + if (ret.second > 1) { + removed.push_back(std::move(ret.first)); + } + if (to_remove == last_to_remove) { + for (auto &x : removed) { + x.vertex.properties["id"].ValueInt() += num_nodes; + } + remove = false; + ++num_shifts; + to_remove = + num_independent_nodes / FLAGS_num_workers * thread_id + + 1 + num_shifts * num_nodes; + last_to_remove = + to_remove + num_independent_nodes / FLAGS_num_workers; + } + } + } + } + + client.Close(); + }, + i); + } + + // Open stream for writing stats. + std::streambuf *buf; + std::ofstream f; + if (FLAGS_output != "") { + f.open(FLAGS_output); + buf = f.rdbuf(); + } else { + buf = std::cout.rdbuf(); + } + std::ostream out(buf); + + while (timer.Elapsed().count() < FLAGS_duration) { + using namespace std::chrono_literals; + out << "{ \"num_executed_queries\": " << executed_queries << ", " + << "\"elapsed_time\": " << timer.Elapsed().count() << "}" << std::endl; + out.flush(); + std::this_thread::sleep_for(1s); + } + keep_running = false; + + for (int i = 0; i < FLAGS_num_workers; ++i) { + threads[i].join(); + } + + return 0; +} diff --git a/tests/macro_benchmark/harness/clients/postgres_client.hpp b/tests/macro_benchmark/harness/clients/postgres_client.hpp index c6c90decd..8d78fecf2 100644 --- a/tests/macro_benchmark/harness/clients/postgres_client.hpp +++ b/tests/macro_benchmark/harness/clients/postgres_client.hpp @@ -6,7 +6,6 @@ #include <fmt/format.h> #include <glog/logging.h> - #include <libpq-fe.h> #include "communication/bolt/client.hpp" @@ -49,10 +48,13 @@ class Client { } } - QueryData Execute(const std::string &query, - const std::map<std::string, std::string> ¶meters) { + QueryData Execute( + const std::string &query, + const std::map<std::string, communication::bolt::DecodedValue> + ¶meters) { QueryData ret; + CHECK(parameters.size() == 0U) << "Parameters not yet supported"; DLOG(INFO) << "Sending run message with statement: '" << query << "'"; result_ = PQexec(connection_, query.c_str()); diff --git a/tests/macro_benchmark/harness/clients/harness_client.cpp b/tests/macro_benchmark/harness/clients/query_client.cpp similarity index 77% rename from tests/macro_benchmark/harness/clients/harness_client.cpp rename to tests/macro_benchmark/harness/clients/query_client.cpp index 910111dfe..941e559a9 100644 --- a/tests/macro_benchmark/harness/clients/harness_client.cpp +++ b/tests/macro_benchmark/harness/clients/query_client.cpp @@ -4,6 +4,7 @@ #include <gflags/gflags.h> #include <glog/logging.h> +#include "communication/bolt/client.hpp" #include "communication/bolt/v1/decoder/decoded_value.hpp" #include "threading/sync/spinlock.hpp" #include "utils/algorithm.hpp" @@ -11,7 +12,7 @@ #include "bolt_client.hpp" #include "common.hpp" -#include "postgres_client.hpp" +//#include "postgres_client.hpp" DEFINE_string(protocol, "bolt", "Protocol to use (available: bolt, postgres)"); DEFINE_int32(num_workers, 1, "Number of workers"); @@ -83,9 +84,8 @@ void ExecuteQueries(std::istream &istream, int num_workers, str = queries[pos]; } try { - metadata[pos] = ExecuteNTimesTillSuccess<ClientT, ExceptionT>( - client, str, MAX_RETRIES) - .metadata; + metadata[pos] = + ExecuteNTimesTillSuccess(client, str, {}, MAX_RETRIES).metadata; } catch (const ExceptionT &e) { LOG(FATAL) << "Could not execute query '" << str << "' " << MAX_RETRIES << " times! Error message: " << e.what(); @@ -105,12 +105,6 @@ void ExecuteQueries(std::istream &istream, int num_workers, PrintSummary(ostream, duration, metadata); } -using BoltClientT = BoltClient; -using BoltExceptionT = communication::bolt::ClientQueryException; - -using PostgresClientT = postgres::Client; -using PostgresExceptionT = postgres::ClientQueryException; - int main(int argc, char **argv) { gflags::ParseCommandLineFlags(&argc, &argv, true); google::InitGoogleLogging(argv[0]); @@ -134,18 +128,27 @@ int main(int argc, char **argv) { std::string port = FLAGS_port; if (FLAGS_protocol == "bolt") { if (port == "") port = "7687"; + + using BoltClientT = BoltClient; + using BoltExceptionT = communication::bolt::ClientQueryException; ExecuteQueries<BoltClientT, BoltExceptionT>( *istream, FLAGS_num_workers, *ostream, FLAGS_address, port, FLAGS_username, FLAGS_password, FLAGS_database); } else if (FLAGS_protocol == "postgres") { - permanent_assert(FLAGS_username != "", - "Username can't be empty for postgres!"); - permanent_assert(FLAGS_database != "", - "Database can't be empty for postgres!"); - if (port == "") port = "5432"; - ExecuteQueries<PostgresClientT, PostgresExceptionT>( - *istream, FLAGS_num_workers, *ostream, FLAGS_address, port, - FLAGS_username, FLAGS_password, FLAGS_database); + LOG(FATAL) << "Postgres not yet supported"; + // TODO: Currently libpq is linked dynamically so it is a pain to move + // harness_client executable to other machines without libpq. + // CHECK(FLAGS_username != "") << "Username can't be empty for + // postgres!"; + // CHECK(FLAGS_database != "") << "Database can't be empty for + // postgres!"; + // if (port == "") port = "5432"; + // + // using PostgresClientT = postgres::Client; + // using PostgresExceptionT = postgres::ClientQueryException; + // ExecuteQueries<PostgresClientT, PostgresExceptionT>( + // *istream, FLAGS_num_workers, *ostream, FLAGS_address, port, + // FLAGS_username, FLAGS_password, FLAGS_database); } return 0; diff --git a/tests/macro_benchmark/harness/common.py b/tests/macro_benchmark/harness/common.py new file mode 100644 index 000000000..e5cab0d41 --- /dev/null +++ b/tests/macro_benchmark/harness/common.py @@ -0,0 +1,19 @@ +import os + +WALL_TIME = "wall_time" +CPU_TIME = "cpu_time" + +DIR_PATH = os.path.dirname(os.path.realpath(__file__)) + +def get_absolute_path(path, base=""): + if base == "build": + extra = "../../../build" + elif base == "build_release": + extra = "../../../build_release" + elif base == "libs": + extra = "../../../libs" + elif base == "config": + extra = "../../../config" + else: + extra = "" + return os.path.normpath(os.path.join(DIR_PATH, extra, path)) diff --git a/tests/macro_benchmark/harness/config/neo4j.conf b/tests/macro_benchmark/harness/config/neo4j.conf index a18b76851..bdfb44461 100644 --- a/tests/macro_benchmark/harness/config/neo4j.conf +++ b/tests/macro_benchmark/harness/config/neo4j.conf @@ -322,4 +322,3 @@ dbms.udc.enabled=false # Disable query cache dbms.query_cache_size=0 - diff --git a/tests/macro_benchmark/harness/config/neo4j_long_running.conf b/tests/macro_benchmark/harness/config/neo4j_long_running.conf new file mode 100644 index 000000000..30ad8ca40 --- /dev/null +++ b/tests/macro_benchmark/harness/config/neo4j_long_running.conf @@ -0,0 +1,324 @@ +#***************************************************************** +# Neo4j configuration +# +# For more details and a complete list of settings, please see +# https://neo4j.com/docs/operations-manual/current/reference/configuration-settings/ +#***************************************************************** + +# The name of the database to mount +#dbms.active_database=graph.db + +# Paths of directories in the installation. +#dbms.directories.data=/var/lib/neo4j/data +#dbms.directories.plugins=/var/lib/neo4j/plugins +#dbms.directories.certificates=/var/lib/neo4j/certificates +#dbms.directories.logs=/var/log/neo4j +#dbms.directories.lib=/usr/share/neo4j/lib +#dbms.directories.run=/var/run/neo4j + +# This setting constrains all `LOAD CSV` import files to be under the `import` directory. Remove or comment it out to +# allow files to be loaded from anywhere in the filesystem; this introduces possible security problems. See the +# `LOAD CSV` section of the manual for details. +#dbms.directories.import=/var/lib/neo4j/import + +# Whether requests to Neo4j are authenticated. +# To disable authentication, uncomment this line +dbms.security.auth_enabled=false + +# Enable this to be able to upgrade a store from an older version. +#dbms.allow_format_migration=true + +# Java Heap Size: by default the Java heap size is dynamically +# calculated based on available system resources. +# Uncomment these lines to set specific initial and maximum +# heap size. +#dbms.memory.heap.initial_size=512m +#dbms.memory.heap.max_size=512m + +# The amount of memory to use for mapping the store files, in bytes (or +# kilobytes with the 'k' suffix, megabytes with 'm' and gigabytes with 'g'). +# If Neo4j is running on a dedicated server, then it is generally recommended +# to leave about 2-4 gigabytes for the operating system, give the JVM enough +# heap to hold all your transaction state and query context, and then leave the +# rest for the page cache. +# The default page cache memory assumes the machine is dedicated to running +# Neo4j, and is heuristically set to 50% of RAM minus the max Java heap size. +#dbms.memory.pagecache.size=10g + +#***************************************************************** +# Network connector configuration +#***************************************************************** + +# With default configuration Neo4j only accepts local connections. +# To accept non-local connections, uncomment this line: +#dbms.connectors.default_listen_address=0.0.0.0 + +# You can also choose a specific network interface, and configure a non-default +# port for each connector, by setting their individual listen_address. + +# The address at which this server can be reached by its clients. This may be the server's IP address or DNS name, or +# it may be the address of a reverse proxy which sits in front of the server. This setting may be overridden for +# individual connectors below. +#dbms.connectors.default_advertised_address=localhost + +# You can also choose a specific advertised hostname or IP address, and +# configure an advertised port for each connector, by setting their +# individual advertised_address. + +# Bolt connector +dbms.connector.bolt.enabled=true +#dbms.connector.bolt.tls_level=OPTIONAL +#dbms.connector.bolt.listen_address=:7687 + +# HTTP Connector. There must be exactly one HTTP connector. +dbms.connector.http.enabled=true +#dbms.connector.http.listen_address=:7474 + +# HTTPS Connector. There can be zero or one HTTPS connectors. +dbms.connector.https.enabled=false +#dbms.connector.https.listen_address=:7473 + +# Number of Neo4j worker threads. +#dbms.threads.worker_count= + +#***************************************************************** +# SSL system configuration +#***************************************************************** + +# Names of the SSL policies to be used for the respective components. + +# The legacy policy is a special policy which is not defined in +# the policy configuration section, but rather derives from +# dbms.directories.certificates and associated files +# (by default: neo4j.key and neo4j.cert). Its use will be deprecated. + +# The policies to be used for connectors. +# +# N.B: Note that a connector must be configured to support/require +# SSL/TLS for the policy to actually be utilized. +# +# see: dbms.connector.*.tls_level + +#bolt.ssl_policy=legacy +#https.ssl_policy=legacy + +#***************************************************************** +# SSL policy configuration +#***************************************************************** + +# Each policy is configured under a separate namespace, e.g. +# dbms.ssl.policy.<policyname>.* +# +# The example settings below are for a new policy named 'default'. + +# The base directory for cryptographic objects. Each policy will by +# default look for its associated objects (keys, certificates, ...) +# under the base directory. +# +# Every such setting can be overriden using a full path to +# the respective object, but every policy will by default look +# for cryptographic objects in its base location. +# +# Mandatory setting + +#dbms.ssl.policy.default.base_directory=certificates/default + +# Allows the generation of a fresh private key and a self-signed +# certificate if none are found in the expected locations. It is +# recommended to turn this off again after keys have been generated. +# +# Keys should in general be generated and distributed offline +# by a trusted certificate authority (CA) and not by utilizing +# this mode. + +#dbms.ssl.policy.default.allow_key_generation=false + +# Enabling this makes it so that this policy ignores the contents +# of the trusted_dir and simply resorts to trusting everything. +# +# Use of this mode is discouraged. It would offer encryption but no security. + +#dbms.ssl.policy.default.trust_all=false + +# The private key for the default SSL policy. By default a file +# named private.key is expected under the base directory of the policy. +# It is mandatory that a key can be found or generated. + +#dbms.ssl.policy.default.private_key= + +# The private key for the default SSL policy. By default a file +# named public.crt is expected under the base directory of the policy. +# It is mandatory that a certificate can be found or generated. + +#dbms.ssl.policy.default.public_certificate= + +# The certificates of trusted parties. By default a directory named +# 'trusted' is expected under the base directory of the policy. It is +# mandatory to create the directory so that it exists, because it cannot +# be auto-created (for security purposes). +# +# To enforce client authentication client_auth must be set to 'require'! + +#dbms.ssl.policy.default.trusted_dir= + +# Client authentication setting. Values: none, optional, require +# The default is to require client authentication. +# +# Servers are always authenticated unless explicitly overridden +# using the trust_all setting. In a mutual authentication setup this +# should be kept at the default of require and trusted certificates +# must be installed in the trusted_dir. + +#dbms.ssl.policy.default.client_auth=require + +# A comma-separated list of allowed TLS versions. +# By default TLSv1, TLSv1.1 and TLSv1.2 are allowed. + +#dbms.ssl.policy.default.tls_versions= + +# A comma-separated list of allowed ciphers. +# The default ciphers are the defaults of the JVM platform. + +#dbms.ssl.policy.default.ciphers= + +#***************************************************************** +# Logging configuration +#***************************************************************** + +# To enable HTTP logging, uncomment this line +#dbms.logs.http.enabled=true + +# Number of HTTP logs to keep. +#dbms.logs.http.rotation.keep_number=5 + +# Size of each HTTP log that is kept. +#dbms.logs.http.rotation.size=20m + +# To enable GC Logging, uncomment this line +#dbms.logs.gc.enabled=true + +# GC Logging Options +# see http://docs.oracle.com/cd/E19957-01/819-0084-10/pt_tuningjava.html#wp57013 for more information. +#dbms.logs.gc.options=-XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintPromotionFailure -XX:+PrintTenuringDistribution + +# Number of GC logs to keep. +#dbms.logs.gc.rotation.keep_number=5 + +# Size of each GC log that is kept. +#dbms.logs.gc.rotation.size=20m + +# Size threshold for rotation of the debug log. If set to zero then no rotation will occur. Accepts a binary suffix "k", +# "m" or "g". +#dbms.logs.debug.rotation.size=20m + +# Maximum number of history files for the internal log. +#dbms.logs.debug.rotation.keep_number=7 + +#***************************************************************** +# Miscellaneous configuration +#***************************************************************** + +# Enable this to specify a parser other than the default one. +#cypher.default_language_version=3.0 + +# Determines if Cypher will allow using file URLs when loading data using +# `LOAD CSV`. Setting this value to `false` will cause Neo4j to fail `LOAD CSV` +# clauses that load data from the file system. +#dbms.security.allow_csv_import_from_file_urls=true + +# Retention policy for transaction logs needed to perform recovery and backups. +dbms.tx_log.rotation.retention_policy=1 days + +# Enable a remote shell server which Neo4j Shell clients can log in to. +#dbms.shell.enabled=true +# The network interface IP the shell will listen on (use 0.0.0.0 for all interfaces). +#dbms.shell.host=127.0.0.1 +# The port the shell will listen on, default is 1337. +#dbms.shell.port=1337 + +# Only allow read operations from this Neo4j instance. This mode still requires +# write access to the directory for lock purposes. +#dbms.read_only=false + +# Comma separated list of JAX-RS packages containing JAX-RS resources, one +# package name for each mountpoint. The listed package names will be loaded +# under the mountpoints specified. Uncomment this line to mount the +# org.neo4j.examples.server.unmanaged.HelloWorldResource.java from +# neo4j-server-examples under /examples/unmanaged, resulting in a final URL of +# http://localhost:7474/examples/unmanaged/helloworld/{nodeId} +#dbms.unmanaged_extension_classes=org.neo4j.examples.server.unmanaged=/examples/unmanaged + +#******************************************************************** +# JVM Parameters +#******************************************************************** + +# G1GC generally strikes a good balance between throughput and tail +# latency, without too much tuning. +dbms.jvm.additional=-XX:+UseG1GC + +# Have common exceptions keep producing stack traces, so they can be +# debugged regardless of how often logs are rotated. +dbms.jvm.additional=-XX:-OmitStackTraceInFastThrow + +# Make sure that `initmemory` is not only allocated, but committed to +# the process, before starting the database. This reduces memory +# fragmentation, increasing the effectiveness of transparent huge +# pages. It also reduces the possibility of seeing performance drop +# due to heap-growing GC events, where a decrease in available page +# cache leads to an increase in mean IO response time. +# Try reducing the heap memory, if this flag degrades performance. +dbms.jvm.additional=-XX:+AlwaysPreTouch + +# Trust that non-static final fields are really final. +# This allows more optimizations and improves overall performance. +# NOTE: Disable this if you use embedded mode, or have extensions or dependencies that may use reflection or +# serialization to change the value of final fields! +dbms.jvm.additional=-XX:+UnlockExperimentalVMOptions +dbms.jvm.additional=-XX:+TrustFinalNonStaticFields + +# Disable explicit garbage collection, which is occasionally invoked by the JDK itself. +dbms.jvm.additional=-XX:+DisableExplicitGC + +# Remote JMX monitoring, uncomment and adjust the following lines as needed. Absolute paths to jmx.access and +# jmx.password files are required. +# Also make sure to update the jmx.access and jmx.password files with appropriate permission roles and passwords, +# the shipped configuration contains only a read only role called 'monitor' with password 'Neo4j'. +# For more details, see: http://download.oracle.com/javase/8/docs/technotes/guides/management/agent.html +# On Unix based systems the jmx.password file needs to be owned by the user that will run the server, +# and have permissions set to 0600. +# For details on setting these file permissions on Windows see: +# http://docs.oracle.com/javase/8/docs/technotes/guides/management/security-windows.html +#dbms.jvm.additional=-Dcom.sun.management.jmxremote.port=3637 +#dbms.jvm.additional=-Dcom.sun.management.jmxremote.authenticate=true +#dbms.jvm.additional=-Dcom.sun.management.jmxremote.ssl=false +#dbms.jvm.additional=-Dcom.sun.management.jmxremote.password.file=/absolute/path/to/conf/jmx.password +#dbms.jvm.additional=-Dcom.sun.management.jmxremote.access.file=/absolute/path/to/conf/jmx.access + +# Some systems cannot discover host name automatically, and need this line configured: +#dbms.jvm.additional=-Djava.rmi.server.hostname=$THE_NEO4J_SERVER_HOSTNAME + +# Expand Diffie Hellman (DH) key size from default 1024 to 2048 for DH-RSA cipher suites used in server TLS handshakes. +# This is to protect the server from any potential passive eavesdropping. +dbms.jvm.additional=-Djdk.tls.ephemeralDHKeySize=2048 + +#******************************************************************** +# Wrapper Windows NT/2000/XP Service Properties +#******************************************************************** +# WARNING - Do not modify any of these properties when an application +# using this configuration file has been installed as a service. +# Please uninstall the service before modifying this section. The +# service can then be reinstalled. + +# Name of the service +dbms.windows_service_name=neo4j + +#******************************************************************** +# Other Neo4j system properties +#******************************************************************** +dbms.jvm.additional=-Dunsupported.dbms.udc.source=debian + +# Disable Neo4j usage data collection +dbms.udc.enabled=false + +# Disable query cache +dbms.query_cache_size=1000 diff --git a/tests/macro_benchmark/harness/databases.py b/tests/macro_benchmark/harness/databases.py new file mode 100644 index 000000000..2801f069c --- /dev/null +++ b/tests/macro_benchmark/harness/databases.py @@ -0,0 +1,169 @@ +import logging +import os +import subprocess +from argparse import ArgumentParser +from collections import defaultdict +import tempfile +import shutil +import time +from common import get_absolute_path + +try: + import jail + APOLLO = True +except: + import jail_faker as jail + APOLLO = False + + +def wait_for_server(port, delay=1.0): + cmd = ["nc", "-z", "-w", "1", "127.0.0.1", port] + while subprocess.call(cmd) != 0: + time.sleep(0.5) + time.sleep(delay) + + +class Memgraph: + """ + Knows how to start and stop memgraph. + """ + def __init__(self, args, config, num_workers, cpus=None): + self.log = logging.getLogger("MemgraphRunner") + argp = ArgumentParser("MemgraphArgumentParser") + argp.add_argument("--runner-bin", + default=get_absolute_path("memgraph", "build")) + argp.add_argument("--port", default="7687", + help="Database and client port") + self.log.info("Initializing Runner with arguments %r", args) + self.args, _ = argp.parse_known_args(args) + self.config = config + self.num_workers = num_workers + self.database_bin = jail.get_process() + if cpus: + self.database_bin.set_cpus(cpus) + + def start(self): + self.log.info("start") + env = {"MEMGRAPH_CONFIG": self.config} + database_args = ["--port", self.args.port] + if self.num_workers: + database_args += ["--num_workers", self.num_workers] + + # find executable path + runner_bin = self.args.runner_bin + if not os.path.exists(runner_bin): + # Apollo builds both debug and release binaries on diff + # so we need to use the release binary if the debug one + # doesn't exist + runner_bin = get_absolute_path("memgraph", "build_release") + + # start memgraph + self.database_bin.run(runner_bin, database_args, env=env, timeout=600) + wait_for_server(self.args.port) + + def stop(self): + self.database_bin.send_signal(jail.SIGTERM) + self.database_bin.wait() + + +class Neo: + """ + Knows how to start and stop neo4j. + """ + def __init__(self, args, config, cpus=None): + self.log = logging.getLogger("NeoRunner") + argp = ArgumentParser("NeoArgumentParser") + argp.add_argument("--runner-bin", default=get_absolute_path( + "neo4j/bin/neo4j", "libs")) + argp.add_argument("--port", default="7687", + help="Database and client port") + argp.add_argument("--http-port", default="7474", + help="Database and client port") + self.log.info("Initializing Runner with arguments %r", args) + self.args, _ = argp.parse_known_args(args) + self.config = config + self.database_bin = jail.get_process() + if cpus: + self.database_bin.set_cpus(cpus) + + def start(self): + self.log.info("start") + + # create home directory + self.neo4j_home_path = tempfile.mkdtemp(dir="/dev/shm") + + try: + os.symlink(os.path.join(get_absolute_path("neo4j", "libs"), "lib"), + os.path.join(self.neo4j_home_path, "lib")) + neo4j_conf_dir = os.path.join(self.neo4j_home_path, "conf") + neo4j_conf_file = os.path.join(neo4j_conf_dir, "neo4j.conf") + os.mkdir(neo4j_conf_dir) + shutil.copyfile(self.config, neo4j_conf_file) + with open(neo4j_conf_file, "a") as f: + f.write("\ndbms.connector.bolt.listen_address=:" + + self.args.port + "\n") + f.write("\ndbms.connector.http.listen_address=:" + + self.args.http_port + "\n") + + # environment + cwd = os.path.dirname(self.args.runner_bin) + env = {"NEO4J_HOME": self.neo4j_home_path} + + self.database_bin.run(self.args.runner_bin, args=["console"], + env=env, timeout=600, cwd=cwd) + except: + shutil.rmtree(self.neo4j_home_path) + raise Exception("Couldn't run Neo4j!") + + wait_for_server(self.args.http_port, 2.0) + + def stop(self): + self.database_bin.send_signal(jail.SIGTERM) + self.database_bin.wait() + if os.path.exists(self.neo4j_home_path): + shutil.rmtree(self.neo4j_home_path) + + +class Postgres: + """ + Knows how to start and stop PostgreSQL. + """ + def __init__(self, args, cpus): + self.log = logging.getLogger("PostgresRunner") + argp = ArgumentParser("PostgresArgumentParser") + argp.add_argument("--init-bin", default=get_absolute_path( + "postgresql/bin/initdb", "libs")) + argp.add_argument("--runner-bin", default=get_absolute_path( + "postgresql/bin/postgres", "libs")) + argp.add_argument("--port", default="5432", + help="Database and client port") + self.log.info("Initializing Runner with arguments %r", args) + self.args, _ = argp.parse_known_args(args) + self.username = "macro_benchmark" + self.database_bin = jail.get_process() + self.database_bin.set_cpus(cpus) + + def start(self): + self.log.info("start") + self.data_path = tempfile.mkdtemp(dir="/dev/shm") + init_args = ["-D", self.data_path, "-U", self.username] + self.database_bin.run_and_wait(self.args.init_bin, init_args) + + # args + runner_args = ["-D", self.data_path, "-c", "port=" + self.args.port, + "-c", "ssl=false", "-c", "max_worker_processes=1"] + + try: + self.database_bin.run(self.args.runner_bin, args=runner_args, + timeout=600) + except: + shutil.rmtree(self.data_path) + raise Exception("Couldn't run PostgreSQL!") + + wait_for_server(self.args.port) + + def stop(self): + self.database_bin.send_signal(jail.SIGTERM) + self.database_bin.wait() + if os.path.exists(self.data_path): + shutil.rmtree(self.data_path) diff --git a/tests/macro_benchmark/harness/groups/pokec/.gitignore b/tests/macro_benchmark/harness/groups/pokec/.gitignore new file mode 100644 index 000000000..cf7457bc3 --- /dev/null +++ b/tests/macro_benchmark/harness/groups/pokec/.gitignore @@ -0,0 +1 @@ +pokec_small.setup.cypher diff --git a/tests/macro_benchmark/harness/groups/pokec/pokec_small.config.json b/tests/macro_benchmark/harness/groups/pokec/pokec_small.config.json new file mode 100644 index 000000000..fcafa44ac --- /dev/null +++ b/tests/macro_benchmark/harness/groups/pokec/pokec_small.config.json @@ -0,0 +1,3 @@ +{ + "duration": 30 +} diff --git a/tests/macro_benchmark/harness/groups/pokec/pokec_small.run.json b/tests/macro_benchmark/harness/groups/pokec/pokec_small.run.json new file mode 100644 index 000000000..8cd666f00 --- /dev/null +++ b/tests/macro_benchmark/harness/groups/pokec/pokec_small.run.json @@ -0,0 +1,28 @@ +{ + "num_independent_nodes" : 4111, + "num_nodes" : 10000, + "independent_label": "User", + "read_probability": 0.5, + "queries" : [ + { + "query": "MATCH (n :User {id : $id})-[]-(m) RETURN AVG(n.age + m.age)", + "params" : [ + { + "name" : "id", + "low" : 1, + "high" : 10000 + } + ] + }, + { + "query": "MATCH (n :User {id : $id})-[]-(m)-[]-(k) RETURN AVG(n.age + m.age + k.age)", + "params" : [ + { + "name" : "id", + "low" : 1, + "high" : 10000 + } + ] + } + ] +} diff --git a/tests/macro_benchmark/harness/harness.py b/tests/macro_benchmark/harness/harness.py index 1aca7acad..e7073887d 100755 --- a/tests/macro_benchmark/harness/harness.py +++ b/tests/macro_benchmark/harness/harness.py @@ -9,12 +9,11 @@ import json import subprocess from argparse import ArgumentParser from collections import OrderedDict -from collections import defaultdict -import tempfile -import shutil -from statistics import median +from common import get_absolute_path +from query_suite import QuerySuite, QueryParallelSuite +from long_running_suite import LongRunningSuite -from perf import Perf +log = logging.getLogger(__name__) try: import jail @@ -23,35 +22,52 @@ except: import jail_faker as jail APOLLO = False -DIR_PATH = os.path.dirname(os.path.realpath(__file__)) -WALL_TIME = "wall_time" -CPU_TIME = "cpu_time" -log = logging.getLogger(__name__) +class Loader: + """ + Loads file contents. Supported types are: + .py - executable that prints out Cypher queries + .cypher - contains Cypher queries in textual form + .json - contains a configuration + + A QueryLoader object is callable. + A call to it returns a generator that yields loaded data + (Cypher queries, configuration). In that sense one + QueryLoader is reusable. The generator approach makes it possible + to generated different queries each time when executing a .py file. + """ + def __init__(self, file_path): + self.file_path = file_path + + def _queries(self, data): + """ Helper function for breaking down and filtering queries""" + for element in filter(lambda x: x is not None, + map(str.strip, data.replace("\n", " ").split(";"))): + yield element + + def __call__(self): + """ Yields queries found in the given file_path one by one """ + log.debug("Generating queries from file_path: %s", + self.file_path) + _, extension = os.path.splitext(self.file_path) + if extension == ".cypher": + with open(self.file_path) as f: + return self._queries(f.read()) + elif extension == ".py": + return self._queries(subprocess.check_output( + ["python3", self.file_path]).decode("ascii")) + elif extension == ".json": + with open(self.file_path) as f: + return [json.load(f)].__iter__() + else: + raise Exception("Unsupported filetype {} ".format(extension)) + + def __repr__(self): + return "(Loader<%s>)" % self.file_path -def get_absolute_path(path, base=""): - if base == "build": - extra = "../../../build" - elif base == "build_release": - extra = "../../../build_release" - elif base == "libs": - extra = "../../../libs" - elif base == "config": - extra = "../../../config" - else: - extra = "" - return os.path.normpath(os.path.join(DIR_PATH, extra, path)) - -def wait_for_server(port, delay=1.0): - cmd = ["nc", "-z", "-w", "1", "127.0.0.1", port] - while subprocess.call(cmd) != 0: - time.sleep(0.5) - time.sleep(delay) - - -def load_scenarios(args): +def load_scenarios(args, known_keys, suite_groups): """ Scans through folder structure starting with groups_root and loads query scenarios. @@ -98,12 +114,11 @@ def load_scenarios(args): for config_file in config_files: log.debug("Processing config file %s", config_file) config_name = config_file.split(".")[-2] - config_dict[config_name] = QuerySuite.Loader( - os.path.join(base, config_file)) + config_dict[config_name] = Loader(os.path.join(base, config_file)) - # validate that the scenario does not contain any illegal - # keys (defense against typos in file naming) - unknown_keys = set(config_dict) - QuerySuite.KNOWN_KEYS + # Validate that the scenario does not contain any illegal keys (defense + # against typos in file naming). + unknown_keys = set(config_dict) - known_keys if unknown_keys: raise Exception("Unknown QuerySuite config elements: '%r'" % unknown_keys) @@ -114,18 +129,20 @@ def load_scenarios(args): group_scenarios = OrderedDict() for group in dir_content(args.root, os.path.isdir): + if group not in suite_groups: continue log.info("Loading group: '%s'", group) group_scenarios[group] = [] + # Filter out hidden files: .gitignore, ... files = dir_content(os.path.join(args.root, group), - os.path.isfile) + lambda x: os.path.isfile(x) and os.path.basename(x)[0] != ".") - # process group default config + # Process group default config. group_config = {} fill_config_dict(group_config, os.path.join(args.root, group), [f for f in files if f.count(".") == 1]) - # group files on scenario + # Group files on scenario. for scenario_name, scenario_files in itertools.groupby( filter(lambda f: f.count(".") == 2, sorted(files)), lambda x: x.split(".")[0]): @@ -141,436 +158,6 @@ def load_scenarios(args): return group_scenarios -class _QuerySuite: - """ - Executes a Query-based benchmark scenario. Query-based scenarios - consist of setup steps (Cypher queries) executed before the benchmark, - a single Cypher query that is benchmarked, and teardown steps - (Cypher queries) executed after the benchmark. - """ - # what the QuerySuite can work with - KNOWN_KEYS = {"config", "setup", "itersetup", "run", "iterteardown", - "teardown", "common"} - FORMAT = ["{:>24}", "{:>28}", "{:>16}", "{:>18}", "{:>22}", - "{:>16}", "{:>16}"] - FULL_FORMAT = "".join(FORMAT) + "\n" - summary = FULL_FORMAT.format( - "group_name", "scenario_name", "parsing_time", - "planning_time", "plan_execution_time", - WALL_TIME, CPU_TIME) - - def __init__(self, args): - if not APOLLO: - self.perf = Perf() - argp = ArgumentParser(description=__doc__) - argp.add_argument("--perf", help="Run perf on memgraph binary.", - action="store_true") - args, _ = argp.parse_known_args(args) - self.perf = Perf() if args.perf else None - - class Loader: - """ - Loads file contents. Supported types are: - .py - executable that prints out Cypher queries - .cypher - contains Cypher queries in textual form - .json - contains a configuration - - A QueryLoader object is callable. - A call to it returns a generator that yields loaded data - (Cypher queries, configuration). In that sense one - QueryLoader is reusable. The generator approach makes it possible - to generated different queries each time when executing a .py file. - """ - def __init__(self, file_path): - self.file_path = file_path - - def _queries(self, data): - """ Helper function for breaking down and filtering queries""" - for element in filter( - None, map(str.strip, data.replace("\n", " ").split(";"))): - yield element - - def __call__(self): - """ Yields queries found in the given file_path one by one """ - log.debug("Generating queries from file_path: %s", - self.file_path) - _, extension = os.path.splitext(self.file_path) - if extension == ".cypher": - with open(self.file_path) as f: - return self._queries(f.read()) - elif extension == ".py": - return self._queries(subprocess.check_output( - ["python3", self.file_path]).decode("ascii")) - elif extension == ".json": - with open(self.file_path) as f: - return [json.load(f)].__iter__() - else: - raise Exception("Unsupported filetype {} ".format(extension)) - - def __repr__(self): - return "(QuerySuite.Loader<%s>)" % self.file_path - - def run(self, scenario, group_name, scenario_name, runner): - log.debug("QuerySuite.run() with scenario: %s", scenario) - scenario_config = scenario.get("config") - scenario_config = next(scenario_config()) if scenario_config else {} - - def execute(config_name, num_client_workers=1): - queries = scenario.get(config_name) - start_time = time.time() - if queries: - r_val = runner.execute(queries(), num_client_workers) - else: - r_val = None - log.info("\t%s done in %.2f seconds" % (config_name, - time.time() - start_time)) - return r_val - - def add_measurement(dictionary, iteration, key): - if key in dictionary: - measurement = {"target": key, - "value": float(dictionary[key]), - "unit": "s", - "type": "time", - "iteration": iteration} - measurements.append(measurement) - try: - measurement_lists[key].append(float(dictionary[key])) - except: - pass - - measurements = [] - - measurement_lists = defaultdict(list) - - # Run the whole test 3 times because memgraph is sometimes - # consistently slow and with this hack we get a good median - for i in range(3): - pid = runner.start() - execute("setup") - - # warmup phase - for _ in range(min(scenario_config.get("iterations", 1), - scenario_config.get("warmup", 2))): - execute("itersetup") - execute("run", scenario_config.get("num_client_workers", 1)) - execute("iterteardown") - - if self.perf: - self.perf.start(pid) - - # TODO per scenario/run runner configuration - num_iterations = scenario_config.get("iterations", 1) - for iteration in range(num_iterations): - # TODO if we didn't have the itersetup it would be trivial - # to move iteration to the bolt_client script, so we would not - # have to start and stop the client for each iteration, it would - # most likely run faster - execute("itersetup") - run_result = execute("run", - scenario_config.get("num_client_workers", 1)) - add_measurement(run_result, iteration, WALL_TIME) - add_measurement(run_result, iteration, CPU_TIME) - for measurement in ["parsing_time", - "plan_execution_time", - "planning_time"] : - for i in range(len(run_result.get("metadatas", []))): - add_measurement(run_result["metadatas"][i], iteration, - measurement) - execute("iterteardown") - - if self.perf: - self.perf.stop() - - # TODO value outlier detection and warning across iterations - execute("teardown") - runner.stop() - - self.append_scenario_summary(group_name, scenario_name, - measurement_lists, num_iterations) - return measurements - - def append_scenario_summary(self, group_name, scenario_name, - measurement_lists, num_iterations): - self.summary += self.FORMAT[0].format(group_name) - self.summary += self.FORMAT[1].format(scenario_name) - for i, key in enumerate(("parsing_time", "planning_time", - "plan_execution_time", WALL_TIME, CPU_TIME)): - if key not in measurement_lists: - time = "-" - else: - # Median is used instead of avg to avoid effect of outliers. - time = "{:.10f}".format(median(measurement_lists[key])) - self.summary += self.FORMAT[i + 2].format(time) - self.summary += "\n" - - def runners(self): - """ Which runners can execute a QuerySuite scenario """ - assert False, "This is a base class, use one of derived suites" - - def groups(self): - """ Which groups can be executed by a QuerySuite scenario """ - assert False, "This is a base class, use one of derived suites" - - -class QuerySuite(_QuerySuite): - def __init__(self, args): - _QuerySuite.__init__(self, args) - - def runners(self): - return ["MemgraphRunner", "NeoRunner"] - - def groups(self): - return ["1000_create", "unwind_create", "match", "dense_expand", - "expression", "aggregation", "return", "update", "delete"] - - -class QueryParallelSuite(_QuerySuite): - def __init__(self, args): - _QuerySuite.__init__(self, args) - - def runners(self): - return ["MemgraphRunner", "NeoRunner"] - - def groups(self): - return ["aggregation_parallel", "create_parallel"] - - -# Database wrappers. - -class Memgraph: - """ - Knows how to start and stop memgraph. - """ - def __init__(self, args, cpus): - self.log = logging.getLogger("MemgraphRunner") - argp = ArgumentParser("MemgraphArgumentParser", add_help=False) - argp.add_argument("--runner-bin", - default=get_absolute_path("memgraph", "build")) - argp.add_argument("--runner-config", - default=get_absolute_path("benchmarking_latency.conf", "config")) - argp.add_argument("--port", default="7687", - help="Database and client port") - self.log.info("Initializing Runner with arguments %r", args) - self.args, _ = argp.parse_known_args(args) - self.database_bin = jail.get_process() - self.database_bin.set_cpus(cpus) - - def start(self): - self.log.info("start") - env = {"MEMGRAPH_CONFIG": self.args.runner_config} - database_args = ["--port", self.args.port] - - # find executable path - runner_bin = self.args.runner_bin - if not os.path.exists(runner_bin): - # Apollo builds both debug and release binaries on diff - # so we need to use the release binary if the debug one - # doesn't exist - runner_bin = get_absolute_path("memgraph", "build_release") - - # start memgraph - self.database_bin.run(runner_bin, database_args, env=env, timeout=600) - wait_for_server(self.args.port) - return self.database_bin.get_pid() if not APOLLO else None - - def stop(self): - self.database_bin.send_signal(jail.SIGTERM) - self.database_bin.wait() - - -class Neo: - def __init__(self, args, cpus): - self.log = logging.getLogger("NeoRunner") - argp = ArgumentParser("NeoArgumentParser", add_help=False) - argp.add_argument("--runner-bin", default=get_absolute_path( - "neo4j/bin/neo4j", "libs")) - argp.add_argument("--runner-config", - default=get_absolute_path("config/neo4j.conf")) - argp.add_argument("--port", default="7687", - help="Database and client port") - argp.add_argument("--http-port", default="7474", - help="Database and client port") - self.log.info("Initializing Runner with arguments %r", args) - self.args, _ = argp.parse_known_args(args) - self.database_bin = jail.get_process() - self.database_bin.set_cpus(cpus) - - def start(self): - self.log.info("start") - - # create home directory - self.neo4j_home_path = tempfile.mkdtemp(dir="/dev/shm") - - try: - os.symlink(os.path.join(get_absolute_path("neo4j", "libs"), "lib"), - os.path.join(self.neo4j_home_path, "lib")) - neo4j_conf_dir = os.path.join(self.neo4j_home_path, "conf") - neo4j_conf_file = os.path.join(neo4j_conf_dir, "neo4j.conf") - os.mkdir(neo4j_conf_dir) - shutil.copyfile(self.args.runner_config, neo4j_conf_file) - with open(neo4j_conf_file, "a") as f: - f.write("\ndbms.connector.bolt.listen_address=:" + - self.args.port + "\n") - f.write("\ndbms.connector.http.listen_address=:" + - self.args.http_port + "\n") - - # environment - cwd = os.path.dirname(self.args.runner_bin) - env = {"NEO4J_HOME": self.neo4j_home_path} - - self.database_bin.run(self.args.runner_bin, args=["console"], - env=env, timeout=600, cwd=cwd) - except: - shutil.rmtree(self.neo4j_home_path) - raise Exception("Couldn't run Neo4j!") - - wait_for_server(self.args.http_port, 2.0) - return self.database_bin.get_pid() if not APOLLO else None - - def stop(self): - self.database_bin.send_signal(jail.SIGTERM) - self.database_bin.wait() - if os.path.exists(self.neo4j_home_path): - shutil.rmtree(self.neo4j_home_path) - - -class Postgres: - """ - Knows how to start and stop PostgreSQL. - """ - def __init__(self, args, cpus): - self.log = logging.getLogger("PostgresRunner") - argp = ArgumentParser("PostgresArgumentParser", add_help=False) - argp.add_argument("--init-bin", default=get_absolute_path( - "postgresql/bin/initdb", "libs")) - argp.add_argument("--runner-bin", default=get_absolute_path( - "postgresql/bin/postgres", "libs")) - argp.add_argument("--port", default="5432", - help="Database and client port") - self.log.info("Initializing Runner with arguments %r", args) - self.args, _ = argp.parse_known_args(args) - self.username = "macro_benchmark" - self.database_bin = jail.get_process() - self.database_bin.set_cpus(cpus) - - def start(self): - self.log.info("start") - self.data_path = tempfile.mkdtemp(dir="/dev/shm") - init_args = ["-D", self.data_path, "-U", self.username] - self.database_bin.run_and_wait(self.args.init_bin, init_args) - - # args - runner_args = ["-D", self.data_path, "-c", "port=" + self.args.port, - "-c", "ssl=false", "-c", "max_worker_processes=1"] - - try: - self.database_bin.run(self.args.runner_bin, args=runner_args, - timeout=600) - except: - shutil.rmtree(self.data_path) - raise Exception("Couldn't run PostgreSQL!") - - wait_for_server(self.args.port) - return self.database_bin.get_pid() if not APOLLO else None - - def stop(self): - self.database_bin.send_signal(jail.SIGTERM) - self.database_bin.wait() - if os.path.exists(self.data_path): - shutil.rmtree(self.data_path) - - -class _HarnessClientRunner: - """ - Knows how to start and stop database (backend) some client frontend (bolt), - and execute a cypher query. - Execution returns benchmarking data (execution times, memory - usage etc). - Inherited class should implement start method and initialise database_bin - and bolt_client members of type Process. - """ - def __init__(self, args, database, cpus=None): - if cpus is None: cpus = [2, 3] - self.log = logging.getLogger("_HarnessClientRunner") - self.database = database - argp = ArgumentParser("RunnerArgumentParser", add_help=False) - self.args, _ = argp.parse_known_args() - self.bolt_client = jail.get_process() - self.bolt_client.set_cpus(cpus) - - def start(self): - self.database.start() - - def execute(self, queries, num_client_workers): - self.log.debug("execute('%s')", str(queries)) - - client_path = "tests/macro_benchmark/harness_client" - client = get_absolute_path(client_path, "build") - if not os.path.exists(client): - # Apollo builds both debug and release binaries on diff - # so we need to use the release client if the debug one - # doesn't exist - client = get_absolute_path(client_path, "build_release") - - queries_fd, queries_path = tempfile.mkstemp() - try: - queries_file = os.fdopen(queries_fd, "w") - queries_file.write("\n".join(queries)) - queries_file.close() - except: - queries_file.close() - os.remove(queries_path) - raise Exception("Writing queries to temporary file failed") - - output_fd, output = tempfile.mkstemp() - os.close(output_fd) - - client_args = ["--port", self.database.args.port, - "--num-workers", str(num_client_workers), - "--output", output] - - cpu_time_start = self.database.database_bin.get_usage()["cpu"] - # TODO make the timeout configurable per query or something - return_code = self.bolt_client.run_and_wait( - client, client_args, timeout=600, stdin=queries_path) - cpu_time_end = self.database.database_bin.get_usage()["cpu"] - os.remove(queries_path) - if return_code != 0: - with open(self.bolt_client.get_stderr()) as f: - stderr = f.read() - self.log.error("Error while executing queries '%s'. " - "Failed with return_code %d and stderr:\n%s", - str(queries), return_code, stderr) - raise Exception("BoltClient execution failed") - - with open(output) as f: - data = json.loads(f.read()) - data[CPU_TIME] = cpu_time_end - cpu_time_start - - os.remove(output) - return data - - def stop(self): - self.log.info("stop") - self.bolt_client.wait() - self.database.stop() - - -class MemgraphRunner(_HarnessClientRunner): - def __init__(self, args, client_cpus=None, database_cpus=None): - if database_cpus is None: database_cpus = [1] - database = Memgraph(args, database_cpus) - super(MemgraphRunner, self).__init__(args, database, cpus=client_cpus) - - -class NeoRunner(_HarnessClientRunner): - def __init__(self, args, client_cpus=None, database_cpus=None): - if database_cpus is None: database_cpus = [1] - database = Neo(args, database_cpus) - super(NeoRunner, self).__init__(args, database, cpus=client_cpus) - - def main(): argp = ArgumentParser(description=__doc__) # positional, mandatory args @@ -600,7 +187,8 @@ def main(): # Create suites. suites = {"QuerySuite": QuerySuite, - "QueryParallelSuite": QueryParallelSuite} + "QueryParallelSuite": QueryParallelSuite, + "LongRunningSuite": LongRunningSuite} if args.suite not in suites: raise Exception( "Suite '{}' isn't registered. Registered suites are: {}".format( @@ -608,14 +196,15 @@ def main(): suite = suites[args.suite](remaining_args) # Load scenarios. - group_scenarios = load_scenarios(remaining_args) + group_scenarios = load_scenarios( + remaining_args, suite.KNOWN_KEYS, suite.groups()) log.info("Loaded %d groups, with a total of %d scenarios", len(group_scenarios), sum([len(x) for x in group_scenarios.values()])) - # Create runners. - runners = {"MemgraphRunner": MemgraphRunner, "NeoRunner": NeoRunner} - if args.runner not in suite.runners(): + # Create runner. + runners = suite.runners() + if args.runner not in runners: raise Exception("Runner '{}' not registered for suite '{}'".format( args.runner, args.suite)) runner = runners[args.runner](remaining_args) diff --git a/tests/macro_benchmark/harness/jail_faker.py b/tests/macro_benchmark/harness/jail_faker.py index c52fd840c..7dd528a3b 100644 --- a/tests/macro_benchmark/harness/jail_faker.py +++ b/tests/macro_benchmark/harness/jail_faker.py @@ -1,4 +1,5 @@ #!/usr/bin/python3 + import atexit import json import os @@ -12,21 +13,17 @@ import uuid from signal import * - SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) STORAGE_DIR = os.path.join(SCRIPT_DIR, ".storage") - class ProcessException(Exception): pass - class StorageException(Exception): pass - class Process: def __init__(self, tid): self._tid = tid @@ -36,7 +33,8 @@ class Process: self._usage = {} self._files = [] - def run(self, binary, args = None, env = None, timeout = 120, stdin = "/dev/null", cwd = "."): + def run(self, binary, args=None, env=None, timeout=120, + stdin="/dev/null", cwd="."): if args is None: args = [] if env is None: env = {} # don't start a new process if one is already running @@ -59,8 +57,8 @@ class Process: self._timeout = timeout # start process - self._proc = subprocess.Popen(exe, env = env, cwd = cwd, - stdin = open(stdin, "r")) + self._proc = subprocess.Popen(exe, env=env, cwd=cwd, + stdin=open(stdin, "r")) def run_and_wait(self, *args, **kwargs): check = kwargs.pop("check", True) @@ -92,7 +90,7 @@ class Process: return self._usage # this is implemented only in the real API - def set_cpus(self, cpus, hyper = True): + def set_cpus(self, cpus, hyper=True): s = "out" if not hyper else "" sys.stderr.write("WARNING: Trying to set cpus for {} to " "{} with{} hyperthreading!\n".format(str(self), cpus, s)) @@ -113,7 +111,7 @@ class Process: raise ProcessException return self._proc.pid - def _set_usage(self, val, name, only_value = False): + def _set_usage(self, val, name, only_value=False): self._usage[name] = val if only_value: return maxname = "max_" + name @@ -138,10 +136,12 @@ class Process: except: return # for a description of these fields see: man proc; man times - cpu_time = sum(map(lambda x: int(x) / self._ticks_per_sec, data_stat[13:17])) + cpu_time = sum(map(lambda x: int(x) / self._ticks_per_sec, + data_stat[13:17])) self._set_usage(cpu_time, "cpu", only_value = True) self._set_usage(int(data_stat[19]), "threads") - mem_vm, mem_res, mem_shr = map(lambda x: int(x) * self._page_size // 1024, data_statm[:3]) + mem_vm, mem_res, mem_shr = map( + lambda x: int(x) * self._page_size // 1024, data_statm[:3]) self._set_usage(mem_res, "memory") def _watchdog(self): @@ -169,13 +169,14 @@ def _usage_updater(): proc._do_background_tasks() time.sleep(0.1) -_thread = threading.Thread(target = _usage_updater, daemon = True) +_thread = threading.Thread(target=_usage_updater, daemon=True) _thread.start() if not os.path.exists(STORAGE_DIR): os.mkdir(STORAGE_DIR) -_storage_name = os.path.join(STORAGE_DIR, time.strftime("%Y%m%d%H%M%S") + ".json") +_storage_name = os.path.join( + STORAGE_DIR, time.strftime("%Y%m%d%H%M%S") + ".json") _storage_file = open(_storage_name, "w") @atexit.register diff --git a/tests/macro_benchmark/harness/long_running_suite.py b/tests/macro_benchmark/harness/long_running_suite.py new file mode 100644 index 000000000..642ad7279 --- /dev/null +++ b/tests/macro_benchmark/harness/long_running_suite.py @@ -0,0 +1,129 @@ +import logging +import os +import time +import itertools +import json +from argparse import ArgumentParser +from collections import defaultdict +from statistics import median +from common import get_absolute_path +from databases import Memgraph, Neo +from clients import QueryClient, LongRunningClient + +log = logging.getLogger(__name__) + + +class LongRunningSuite: + KNOWN_KEYS = {"config", "setup", "run"} + + def __init__(self, args): + argp = ArgumentParser("LongRunningSuiteArgumentParser") + argp.add_argument("--num-client-workers", default=4) + self.args, _ = argp.parse_known_args(args) + pass + + def run(self, scenario, group_name, scenario_name, runner): + runner.start() + # This suite allows empty lines in setup. Those lines separate query + # groups. It is guaranteed that groups will be executed sequentially, + # but queries in each group are possibly executed concurrently. + query_groups = [[]] + for query in scenario.get("setup")(): + if query == "": + query_groups.append([]) + else: + query_groups[-1].append(query) + if query_groups[-1] == []: + query_groups.pop() + + log.info("Executing {} query groups in setup" + .format(len(query_groups))) + + for i, queries in enumerate(query_groups): + start_time = time.time() + # TODO: number of threads configurable + runner.setup(queries, self.args.num_client_workers) + log.info("\t{}. group imported in done in {:.2f} seconds".format( + i + 1, time.time() - start_time)) + + config = next(scenario.get("config")()) + duration = config["duration"] + log.info("Executing run for {} seconds with {} client workers".format( + duration, self.args.num_client_workers)) + # TODO: number of threads configurable + results = runner.run(next(scenario.get("run")()), duration, + self.args.num_client_workers) + + runner.stop() + + measurements = [] + for result in results: + print(result["num_executed_queries"], result["elapsed_time"]) + # TODO: Revise this. + measurements.append({ + "target": "throughput", + "value": result["num_executed_queries"] / result["elapsed_time"], + "unit": "queries per second", + "type": "throughput"}) + self.summary = "Throughtput: " + str(measurements[-1]["value"]) + return measurements + + def runners(self): + return { "MemgraphRunner" : MemgraphRunner, "NeoRunner" : NeoRunner } + + def groups(self): + return ["pokec"] + + +class _LongRunningRunner: + def __init__(self, args, database): + self.log = logging.getLogger("_LongRunningRunner") + self.database = database + self.query_client = QueryClient(args) + self.long_running_client = LongRunningClient(args) + + def start(self): + self.database.start() + + def setup(self, queries, num_client_workers): + return self.query_client(queries, self.database, num_client_workers) + + def run(self, config, duration, num_client_workers): + return self.long_running_client( + config, self.database, duration, num_client_workers) + + def stop(self): + self.log.info("stop") + self.database.stop() + + +class MemgraphRunner(_LongRunningRunner): + """ + Configures memgraph database for QuerySuite execution. + """ + def __init__(self, args): + argp = ArgumentParser("MemgraphRunnerArgumentParser") + # TODO: change default config + argp.add_argument("--runner-config", default=get_absolute_path( + "benchmarking_throughput.conf", "config"), + help="Path to memgraph config") + argp.add_argument("--num-workers", help="Number of workers") + self.args, remaining_args = argp.parse_known_args(args) + database = Memgraph(remaining_args, self.args.runner_config, + self.args.num_workers) + super(MemgraphRunner, self).__init__(remaining_args, database) + + +class NeoRunner(_LongRunningRunner): + """ + Configures neo4j database for QuerySuite execution. + """ + def __init__(self, args): + argp = ArgumentParser("NeoRunnerArgumentParser") + argp.add_argument("--runner-config", + default=get_absolute_path( + "config/neo4j_long_running.conf"), + help="Path to neo config file") + self.args, remaining_args = argp.parse_known_args(args) + database = Neo(remaining_args, self.args.runner_config, [1]) + super(NeoRunner, self).__init__(remaining_args, database) diff --git a/tests/macro_benchmark/harness/perf.py b/tests/macro_benchmark/harness/perf.py deleted file mode 100644 index 145573813..000000000 --- a/tests/macro_benchmark/harness/perf.py +++ /dev/null @@ -1,32 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -import os -from pathlib import Path -import subprocess -import signal - - -class Perf(): - def __init__(self): - self.first = True - self.max_frequency = Path( - "/proc/sys/kernel/perf_event_max_sample_rate").read_text().strip() - # Check if lbr is available. - status = subprocess.call( - "perf record --call-graph=lbr -a -g sleep 0.0000001".split()) - self.call_graph_technique = "lbr" if not status else "dwarf" - - - def start(self, pid, frequency=None): - if frequency is None: frequency = self.max_frequency - append = "-A" if not self.first else "" - self.first = False - perf_command = "perf record --call-graph={} -F {} -p {} -g {}".format( - self.call_graph_technique, frequency, pid, append).split() - self.perf_proc = subprocess.Popen(perf_command) - - - def stop(self): - self.perf_proc.send_signal(signal.SIGINT) - self.perf_proc.wait() diff --git a/tests/macro_benchmark/harness/query_suite.py b/tests/macro_benchmark/harness/query_suite.py new file mode 100644 index 000000000..b87b4883a --- /dev/null +++ b/tests/macro_benchmark/harness/query_suite.py @@ -0,0 +1,209 @@ +import logging +import os +import time +import itertools +import json +from argparse import ArgumentParser +from collections import defaultdict +import tempfile +from statistics import median +from common import get_absolute_path, WALL_TIME, CPU_TIME +from databases import Memgraph, Neo +from clients import QueryClient + +log = logging.getLogger(__name__) + + +class _QuerySuite: + """ + Executes a Query-based benchmark scenario. Query-based scenarios + consist of setup steps (Cypher queries) executed before the benchmark, + a single Cypher query that is benchmarked, and teardown steps + (Cypher queries) executed after the benchmark. + """ + # what the QuerySuite can work with + KNOWN_KEYS = {"config", "setup", "itersetup", "run", "iterteardown", + "teardown", "common"} + FORMAT = ["{:>24}", "{:>28}", "{:>16}", "{:>18}", "{:>22}", + "{:>16}", "{:>16}"] + FULL_FORMAT = "".join(FORMAT) + "\n" + summary = FULL_FORMAT.format( + "group_name", "scenario_name", "parsing_time", + "planning_time", "plan_execution_time", + WALL_TIME, CPU_TIME) + + def __init__(self, args): + pass + + def run(self, scenario, group_name, scenario_name, runner): + log.debug("QuerySuite.run() with scenario: %s", scenario) + scenario_config = scenario.get("config") + scenario_config = next(scenario_config()) if scenario_config else {} + + def execute(config_name, num_client_workers=1): + queries = scenario.get(config_name) + start_time = time.time() + if queries: + r_val = runner.execute(queries(), num_client_workers) + else: + r_val = None + log.info("\t%s done in %.2f seconds" % (config_name, + time.time() - start_time)) + return r_val + + def add_measurement(dictionary, iteration, key): + if key in dictionary: + measurement = {"target": key, + "value": float(dictionary[key]), + "unit": "s", + "type": "time", + "iteration": iteration} + measurements.append(measurement) + try: + measurement_lists[key].append(float(dictionary[key])) + except: + pass + + measurements = [] + + measurement_lists = defaultdict(list) + + # Run the whole test 3 times because memgraph is sometimes + # consistently slow and with this hack we get a good median + for i in range(3): + runner.start() + execute("setup") + + # warmup phase + for _ in range(min(scenario_config.get("iterations", 1), + scenario_config.get("warmup", 2))): + execute("itersetup") + execute("run", scenario_config.get("num_client_workers", 1)) + execute("iterteardown") + + # TODO per scenario/run runner configuration + num_iterations = scenario_config.get("iterations", 1) + for iteration in range(num_iterations): + # TODO if we didn't have the itersetup it would be trivial + # to move iteration to the bolt_client script, so we would not + # have to start and stop the client for each iteration, it would + # most likely run faster + execute("itersetup") + run_result = execute("run", + scenario_config.get("num_client_workers", 1)) + add_measurement(run_result, iteration, WALL_TIME) + add_measurement(run_result, iteration, CPU_TIME) + for measurement in ["parsing_time", + "plan_execution_time", + "planning_time"] : + for i in range(len(run_result.get("metadatas", []))): + add_measurement(run_result["metadatas"][i], iteration, + measurement) + execute("iterteardown") + + # TODO value outlier detection and warning across iterations + execute("teardown") + runner.stop() + + self.append_scenario_summary(group_name, scenario_name, + measurement_lists, num_iterations) + return measurements + + def append_scenario_summary(self, group_name, scenario_name, + measurement_lists, num_iterations): + self.summary += self.FORMAT[0].format(group_name) + self.summary += self.FORMAT[1].format(scenario_name) + for i, key in enumerate(("parsing_time", "planning_time", + "plan_execution_time", WALL_TIME, CPU_TIME)): + if key not in measurement_lists: + time = "-" + else: + # Median is used instead of avg to avoid effect of outliers. + time = "{:.10f}".format(median(measurement_lists[key])) + self.summary += self.FORMAT[i + 2].format(time) + self.summary += "\n" + + def runners(self): + """ Which runners can execute a QuerySuite scenario """ + assert False, "This is a base class, use one of derived suites" + + def groups(self): + """ Which groups can be executed by a QuerySuite scenario """ + assert False, "This is a base class, use one of derived suites" + + +class QuerySuite(_QuerySuite): + def __init__(self, args): + _QuerySuite.__init__(self, args) + + def runners(self): + return {"MemgraphRunner" : MemgraphRunner, "NeoRunner" : NeoRunner} + + def groups(self): + return ["1000_create", "unwind_create", "match", "dense_expand", + "expression", "aggregation", "return", "update", "delete"] + + +class QueryParallelSuite(_QuerySuite): + def __init__(self, args): + _QuerySuite.__init__(self, args) + + def runners(self): + # TODO: We should use different runners which will use more threads. + return {"MemgraphRunner" : MemgraphRunner, "NeoRunner" : NeoRunner} + + def groups(self): + return ["aggregation_parallel", "create_parallel"] + + +class _QueryRunner: + """ + Knows how to start and stop database (backend) some client frontend (bolt), + and execute a cypher query. + Execution returns benchmarking data (execution times, memory + usage etc). + """ + def __init__(self, args, database): + self.log = logging.getLogger("_HarnessClientRunner") + self.database = database + self.query_client = QueryClient(args, [2, 3]) + + def start(self): + self.database.start() + + def execute(self, queries, num_client_workers): + return self.query_client(queries, self.database, num_client_workers) + + def stop(self): + self.log.info("stop") + self.database.stop() + + +class MemgraphRunner(_QueryRunner): + """ + Configures memgraph database for QuerySuite execution. + """ + def __init__(self, args): + argp = ArgumentParser("MemgraphRunnerArgumentParser") + argp.add_argument("--runner-config", default=get_absolute_path( + "benchmarking_latency.conf", "config"), + help="Path to memgraph config") + argp.add_argument("--num-workers", help="Number of workers") + self.args, remaining_args = argp.parse_known_args(args) + database = Memgraph(remaining_args, self.args.runner_config, + self.args.num_workers, [1]) + super(MemgraphRunner, self).__init__(remaining_args, database) + + +class NeoRunner(_QueryRunner): + """ + Configures neo4j database for QuerySuite execution. + """ + def __init__(self, args): + argp = ArgumentParser("NeoRunnerArgumentParser") + argp.add_argument("--runner-config", + default=get_absolute_path("config/neo4j.conf"), + help="Path to neo config file") + self.args, remaining_args = argp.parse_known_args(args) + database = Neo(remaining_args, self.args.runner_config, [1]) + super(NeoRunner, self).__init__(remaining_args, database) diff --git a/tests/macro_benchmark/harness/results/.gitignore b/tests/macro_benchmark/harness/results/.gitignore deleted file mode 100644 index d6b7ef32c..000000000 --- a/tests/macro_benchmark/harness/results/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -* -!.gitignore diff --git a/tools/apollo/build_debug b/tools/apollo/build_debug index 7d6f155c5..0e9dfaf0f 100644 --- a/tools/apollo/build_debug +++ b/tools/apollo/build_debug @@ -17,7 +17,7 @@ mkdir build_release cd build_release cmake -DCMAKE_BUILD_TYPE=release .. -TIMEOUT=1000 make -j$THREADS memgraph_link_target memgraph__macro_benchmark__harness_client +TIMEOUT=1000 make -j$THREADS memgraph_link_target memgraph__macro_benchmark cd ../tools/apollo diff --git a/tools/apollo/build_diff b/tools/apollo/build_diff index ac8f223a0..825ef85d4 100644 --- a/tools/apollo/build_diff +++ b/tools/apollo/build_diff @@ -21,7 +21,7 @@ mkdir build_release cd build_release cmake -DCMAKE_BUILD_TYPE=release .. -TIMEOUT=1000 make -j$THREADS memgraph_link_target memgraph__macro_benchmark__harness_client +TIMEOUT=1000 make -j$THREADS memgraph_link_target memgraph__macro_benchmark cd ../../parent @@ -30,7 +30,7 @@ TIMEOUT=600 ./init cd build cmake -DCMAKE_BUILD_TYPE=release .. -TIMEOUT=1000 make -j$THREADS memgraph_link_target parent__macro_benchmark__harness_client +TIMEOUT=1000 make -j$THREADS memgraph_link_target parent__macro_benchmark cd ../../memgraph/tools/apollo