Start work on parallel benchmark

Summary: First version of our benchmark

Reviewers: florijan, buda

Reviewed By: florijan

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D740
This commit is contained in:
Mislav Bradac 2017-09-12 15:25:43 +02:00
parent 682fced81b
commit 7e99e93e47
31 changed files with 1530 additions and 646 deletions

View File

@ -30,7 +30,8 @@ BASE_FLAGS = [
'-I./libs/antlr4/runtime/Cpp/runtime/src',
'-I./build/libs/gflags/include',
'-I./experimental/distributed/src',
'-I./experimental/distributed/libs/cereal/include'
'-I./experimental/distributed/libs/cereal/include',
'-I./libs/postgresql/include'
]
SOURCE_EXTENSIONS = [

View File

@ -0,0 +1,37 @@
# MEMGRAPH DEFAULT BENCHMARKING CONFIG
# NOTE: all paths are relative to the run folder
# (where the executable is run)
# directory to the codes which will be compiled
--compile-directory=compiled
# path to the template (cpp) for codes generation
--template-cpp-path=template/plan_template_cpp
# directory to the folder with snapshots
--snapshot-directory=snapshots
# snapshot cycle interval
# if set to -1 the snapshooter will not run
--snapshot-cycle-sec=-1
# snapshot cycle interval
# if set to -1 the snapshooter will not run
--query_execution_time_sec=-1
# create snapshot disabled on db exit
--snapshot-on-db-exit=false
# max number of snapshots which will be kept on the disk at some point
# if set to -1 the max number of snapshots is unlimited
--max-retained-snapshots=-1
# by default query engine runs in interpret mode
--interpret=true
# database recovering is disabled by default
--recover-on-startup=false
# use ast caching
--ast-cache=true

View File

@ -100,3 +100,11 @@ wget -nv http://deps.memgraph.io/postgresql-9.6.5-1-linux-x64-binaries.tar.gz -O
tar -xzf postgres.tar.gz
mv pgsql postgresql
rm postgres.tar.gz
# nlohmann json
# We wget header instead of cloning repo since repo is huge (lots of test data).
# We use head on Sep 1, 2017 instead of last release since it was long time ago.
mkdir json
cd json
wget "https://raw.githubusercontent.com/nlohmann/json/91e003285312167ad8365f387438ea371b465a7e/src/json.hpp"
cd ..

View File

@ -46,8 +46,9 @@ struct QueryData {
template <typename Socket>
class Client {
public:
Client(Socket &&socket, std::string &username, std::string &password,
std::string client_name = "")
Client(Socket &&socket, const std::string &username,
const std::string &password,
const std::string &client_name = "memgraph-bolt/0.0.1")
: socket_(std::move(socket)) {
DLOG(INFO) << "Sending handshake";
if (!socket_.Write(kPreamble, sizeof(kPreamble))) {
@ -68,10 +69,6 @@ class Client {
}
buffer_.Shift(sizeof(kProtocol));
if (client_name == "") {
client_name = "memgraph-bolt/0.0.1";
}
DLOG(INFO) << "Sending init message";
if (!encoder_.MessageInit(client_name, {{"scheme", "basic"},
{"principal", username},

View File

@ -7,11 +7,11 @@
namespace io::network {
AddrInfo::AddrInfo(struct addrinfo* info) : info(info) {}
AddrInfo::AddrInfo(struct addrinfo *info) : info(info) {}
AddrInfo::~AddrInfo() { freeaddrinfo(info); }
AddrInfo AddrInfo::Get(const char* addr, const char* port) {
AddrInfo AddrInfo::Get(const char *addr, const char *port) {
struct addrinfo hints;
memset(&hints, 0, sizeof(struct addrinfo));
@ -19,7 +19,7 @@ AddrInfo AddrInfo::Get(const char* addr, const char* port) {
hints.ai_socktype = SOCK_STREAM; // TCP socket
hints.ai_flags = AI_PASSIVE;
struct addrinfo* result;
struct addrinfo *result;
auto status = getaddrinfo(addr, port, &hints, &result);
if (status != 0) throw NetworkError(gai_strerror(status));
@ -27,5 +27,5 @@ AddrInfo AddrInfo::Get(const char* addr, const char* port) {
return AddrInfo(result);
}
AddrInfo::operator struct addrinfo*() { return info; }
AddrInfo::operator struct addrinfo *() { return info; }
}

View File

@ -7,16 +7,16 @@ namespace io::network {
* see: man 3 getaddrinfo
*/
class AddrInfo {
AddrInfo(struct addrinfo* info);
AddrInfo(struct addrinfo *info);
public:
~AddrInfo();
static AddrInfo Get(const char* addr, const char* port);
static AddrInfo Get(const char *addr, const char *port);
operator struct addrinfo*();
operator struct addrinfo *();
private:
struct addrinfo* info;
struct addrinfo *info;
};
}

View File

@ -11,7 +11,7 @@ NetworkEndpoint::NetworkEndpoint() : port_(0), family_(0) {
memset(port_str_, 0, sizeof port_str_);
}
NetworkEndpoint::NetworkEndpoint(const char* addr, const char* port) {
NetworkEndpoint::NetworkEndpoint(const char *addr, const char *port) {
if (addr == nullptr) throw NetworkEndpointException("Address can't be null!");
if (port == nullptr) throw NetworkEndpointException("Port can't be null!");
@ -19,27 +19,6 @@ NetworkEndpoint::NetworkEndpoint(const char* addr, const char* port) {
snprintf(address_, sizeof address_, "%s", addr);
snprintf(port_str_, sizeof port_str_, "%s", port);
is_address_valid();
int ret = sscanf(port, "%hu", &port_);
if (ret != 1) throw NetworkEndpointException("Port isn't valid!");
}
NetworkEndpoint::NetworkEndpoint(const std::string& addr,
const std::string& port)
: NetworkEndpoint(addr.c_str(), port.c_str()) {}
NetworkEndpoint::NetworkEndpoint(const char* addr, unsigned short port) {
if (addr == nullptr) throw NetworkEndpointException("Address can't be null!");
snprintf(address_, sizeof address_, "%s", addr);
snprintf(port_str_, sizeof port_str_, "%hu", port);
port_ = port;
is_address_valid();
}
void NetworkEndpoint::is_address_valid() {
in_addr addr4;
in6_addr addr6;
int ret = inet_pton(AF_INET, address_, &addr4);
@ -52,10 +31,15 @@ void NetworkEndpoint::is_address_valid() {
family_ = 6;
} else
family_ = 4;
ret = sscanf(port, "%hu", &port_);
if (ret != 1) throw NetworkEndpointException("Port isn't valid!");
}
const char* NetworkEndpoint::address() { return address_; }
const char* NetworkEndpoint::port_str() { return port_str_; }
unsigned short NetworkEndpoint::port() { return port_; }
unsigned char NetworkEndpoint::family() { return family_; }
NetworkEndpoint::NetworkEndpoint(const std::string &addr,
const std::string &port)
: NetworkEndpoint(addr.c_str(), port.c_str()) {}
NetworkEndpoint::NetworkEndpoint(const std::string &addr, unsigned short port)
: NetworkEndpoint(addr.c_str(), std::to_string(port)) {}
}

View File

@ -20,18 +20,16 @@ class NetworkEndpointException : public utils::BasicException {
class NetworkEndpoint {
public:
NetworkEndpoint();
NetworkEndpoint(const char* addr, const char* port);
NetworkEndpoint(const char* addr, unsigned short port);
NetworkEndpoint(const std::string& addr, const std::string& port);
NetworkEndpoint(const std::string &addr, const std::string &port);
NetworkEndpoint(const char *addr, const char *port);
NetworkEndpoint(const std::string &addr, unsigned short port);
const char* address();
const char* port_str();
unsigned short port();
unsigned char family();
const char *address() const { return address_; }
const char *port_str() const { return port_str_; }
int port() const { return port_; }
unsigned char family() const { return family_; }
private:
void is_address_valid();
char address_[INET6_ADDRSTRLEN];
char port_str_[6];
unsigned short port_;

View File

@ -10,10 +10,10 @@
#include <errno.h>
#include <fcntl.h>
#include <netdb.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
@ -24,14 +24,14 @@ namespace io::network {
Socket::Socket() : socket_(-1) {}
Socket::Socket(int sock, NetworkEndpoint& endpoint)
Socket::Socket(int sock, const NetworkEndpoint &endpoint)
: socket_(sock), endpoint_(endpoint) {}
Socket::Socket(const Socket& s) : socket_(s.id()) {}
Socket::Socket(const Socket &s) : socket_(s.id()) {}
Socket::Socket(Socket&& other) { *this = std::forward<Socket>(other); }
Socket::Socket(Socket &&other) { *this = std::forward<Socket>(other); }
Socket& Socket::operator=(Socket&& other) {
Socket &Socket::operator=(Socket &&other) {
socket_ = other.socket_;
endpoint_ = other.endpoint_;
other.socket_ = -1;
@ -51,12 +51,12 @@ void Socket::Close() {
bool Socket::IsOpen() { return socket_ != -1; }
bool Socket::Connect(NetworkEndpoint& endpoint) {
bool Socket::Connect(const NetworkEndpoint &endpoint) {
if (UNLIKELY(socket_ != -1)) return false;
auto info = AddrInfo::Get(endpoint.address(), endpoint.port_str());
for (struct addrinfo* it = info; it != nullptr; it = it->ai_next) {
for (struct addrinfo *it = info; it != nullptr; it = it->ai_next) {
int sfd = socket(it->ai_family, it->ai_socktype, it->ai_protocol);
if (sfd == -1) continue;
if (connect(sfd, it->ai_addr, it->ai_addrlen) == 0) {
@ -70,12 +70,12 @@ bool Socket::Connect(NetworkEndpoint& endpoint) {
return true;
}
bool Socket::Bind(NetworkEndpoint& endpoint) {
bool Socket::Bind(const NetworkEndpoint &endpoint) {
if (UNLIKELY(socket_ != -1)) return false;
auto info = AddrInfo::Get(endpoint.address(), endpoint.port_str());
for (struct addrinfo* it = info; it != nullptr; it = it->ai_next) {
for (struct addrinfo *it = info; it != nullptr; it = it->ai_next) {
int sfd = socket(it->ai_family, it->ai_socktype, it->ai_protocol);
if (sfd == -1) continue;
@ -94,7 +94,7 @@ bool Socket::Bind(NetworkEndpoint& endpoint) {
// detect bound port, used when the server binds to a random port
struct sockaddr_in6 portdata;
socklen_t portdatalen = sizeof(portdata);
if (getsockname(socket_, (struct sockaddr *) &portdata, &portdatalen) < 0) {
if (getsockname(socket_, (struct sockaddr *)&portdata, &portdatalen) < 0) {
return false;
}
@ -122,16 +122,16 @@ bool Socket::SetKeepAlive() {
if (setsockopt(socket_, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen) < 0)
return false;
optval = 20; // wait 120s before seding keep-alive packets
if (setsockopt(socket_, SOL_TCP, TCP_KEEPIDLE, (void*)&optval, optlen) < 0)
optval = 20; // wait 120s before seding keep-alive packets
if (setsockopt(socket_, SOL_TCP, TCP_KEEPIDLE, (void *)&optval, optlen) < 0)
return false;
optval = 4; // 4 keep-alive packets must fail to close
if (setsockopt(socket_, SOL_TCP, TCP_KEEPCNT, (void*)&optval, optlen) < 0)
optval = 4; // 4 keep-alive packets must fail to close
if (setsockopt(socket_, SOL_TCP, TCP_KEEPCNT, (void *)&optval, optlen) < 0)
return false;
optval = 15; // send keep-alive packets every 15s
if (setsockopt(socket_, SOL_TCP, TCP_KEEPINTVL, (void*)&optval, optlen) < 0)
optval = 15; // send keep-alive packets every 15s
if (setsockopt(socket_, SOL_TCP, TCP_KEEPINTVL, (void *)&optval, optlen) < 0)
return false;
return true;
@ -141,7 +141,7 @@ bool Socket::SetNoDelay() {
int optval = 1;
socklen_t optlen = sizeof(optval);
if (setsockopt(socket_, SOL_TCP, TCP_NODELAY, (void*)&optval, optlen) < 0)
if (setsockopt(socket_, SOL_TCP, TCP_NODELAY, (void *)&optval, optlen) < 0)
return false;
return true;
@ -163,24 +163,24 @@ bool Socket::SetTimeout(long sec, long usec) {
bool Socket::Listen(int backlog) { return listen(socket_, backlog) == 0; }
bool Socket::Accept(Socket* s) {
bool Socket::Accept(Socket *s) {
sockaddr_storage addr;
socklen_t addr_size = sizeof addr;
char addr_decoded[INET6_ADDRSTRLEN];
void* addr_src;
void *addr_src;
unsigned short port;
unsigned char family;
int sfd = accept(socket_, (struct sockaddr*)&addr, &addr_size);
int sfd = accept(socket_, (struct sockaddr *)&addr, &addr_size);
if (UNLIKELY(sfd == -1)) return false;
if (addr.ss_family == AF_INET) {
addr_src = (void*)&(((sockaddr_in*)&addr)->sin_addr);
port = ntohs(((sockaddr_in*)&addr)->sin_port);
addr_src = (void *)&(((sockaddr_in *)&addr)->sin_addr);
port = ntohs(((sockaddr_in *)&addr)->sin_port);
family = 4;
} else {
addr_src = (void*)&(((sockaddr_in6*)&addr)->sin6_addr);
port = ntohs(((sockaddr_in6*)&addr)->sin6_port);
addr_src = (void *)&(((sockaddr_in6 *)&addr)->sin6_addr);
port = ntohs(((sockaddr_in6 *)&addr)->sin6_port);
family = 6;
}
@ -189,7 +189,7 @@ bool Socket::Accept(Socket* s) {
NetworkEndpoint endpoint;
try {
endpoint = NetworkEndpoint(addr_decoded, port);
} catch (NetworkEndpointException& e) {
} catch (NetworkEndpointException &e) {
return false;
}
@ -201,17 +201,17 @@ bool Socket::Accept(Socket* s) {
Socket::operator int() { return socket_; }
int Socket::id() const { return socket_; }
NetworkEndpoint& Socket::endpoint() { return endpoint_; }
const NetworkEndpoint &Socket::endpoint() const { return endpoint_; }
bool Socket::Write(const std::string& str) {
bool Socket::Write(const std::string &str) {
return Write(str.c_str(), str.size());
}
bool Socket::Write(const char* data, size_t len) {
return Write(reinterpret_cast<const uint8_t*>(data), len);
bool Socket::Write(const char *data, size_t len) {
return Write(reinterpret_cast<const uint8_t *>(data), len);
}
bool Socket::Write(const uint8_t* data, size_t len) {
bool Socket::Write(const uint8_t *data, size_t len) {
while (len > 0) {
// MSG_NOSIGNAL is here to disable raising a SIGPIPE
// signal when a connection dies mid-write, the socket
@ -224,7 +224,7 @@ bool Socket::Write(const uint8_t* data, size_t len) {
return true;
}
int Socket::Read(void* buffer, size_t len) {
int Socket::Read(void *buffer, size_t len) {
return read(socket_, buffer, len);
}
}

View File

@ -15,9 +15,9 @@ namespace io::network {
class Socket {
public:
Socket();
Socket(const Socket& s);
Socket(Socket&& other);
Socket& operator=(Socket&& other);
Socket(const Socket &s);
Socket(Socket &&other);
Socket &operator=(Socket &&other);
~Socket();
/**
@ -43,7 +43,7 @@ class Socket {
* true if the connect succeeded
* false if the connect failed
*/
bool Connect(NetworkEndpoint& endpoint);
bool Connect(const NetworkEndpoint &endpoint);
/**
* Binds the socket to the specified endpoint.
@ -54,12 +54,13 @@ class Socket {
* true if the bind succeeded
* false if the bind failed
*/
bool Bind(NetworkEndpoint& endpoint);
bool Bind(const NetworkEndpoint &endpoint);
/**
* Start listening on the bound socket.
*
* @param backlog maximum number of pending connections in the connection queue
* @param backlog maximum number of pending connections in the connection
* queue
*
* @return listen success status:
* true if the listen succeeded
@ -74,10 +75,11 @@ class Socket {
* @param s Socket object that will be instantiated with the new connection
*
* @return accept success status:
* true if a new connection was accepted and the socket 's' was instantiated
* true if a new connection was accepted and the socket 's' was
* instantiated
* false if a new connection accept failed
*/
bool Accept(Socket* s);
bool Accept(Socket *s);
/**
* Sets the socket to non-blocking.
@ -132,7 +134,7 @@ class Socket {
/**
* Returns the currently active endpoint of the socket.
*/
NetworkEndpoint& endpoint();
const NetworkEndpoint &endpoint() const;
/**
* Write data to the socket.
@ -146,9 +148,9 @@ class Socket {
* true if write succeeded
* false if write failed
*/
bool Write(const std::string& str);
bool Write(const char* data, size_t len);
bool Write(const uint8_t* data, size_t len);
bool Write(const std::string &str);
bool Write(const char *data, size_t len);
bool Write(const uint8_t *data, size_t len);
/**
* Read data from the socket.
@ -162,10 +164,10 @@ class Socket {
* == 0 if the client closed the connection
* < 0 if an error has occurred
*/
int Read(void* buffer, size_t len);
int Read(void *buffer, size_t len);
private:
Socket(int sock, NetworkEndpoint& endpoint);
Socket(int sock, const NetworkEndpoint &endpoint);
int socket_;
NetworkEndpoint endpoint_;

View File

@ -10,6 +10,10 @@ message(STATUS "Available ${test_type} cpp files are: ${test_type_cpps}")
# postgres directory
set(postgres_dir ${libs_dir}/postgresql)
# add target that depends on all other targets
set(all_targets_target ${project_name}__${test_type})
add_custom_target(${all_targets_target})
# for each cpp file build binary and register test
foreach(test_cpp ${test_type_cpps})
@ -35,4 +39,7 @@ foreach(test_cpp ${test_type_cpps})
target_link_libraries(${target_name} "${postgres_dir}/lib/libpq.so")
target_include_directories(${target_name} PUBLIC "${postgres_dir}/include")
# add target to dependencies
add_dependencies(${all_targets_target} ${target_name})
endforeach()

View File

@ -0,0 +1,136 @@
import logging
import os
import time
import json
import tempfile
from common import get_absolute_path, WALL_TIME, CPU_TIME
log = logging.getLogger(__name__)
try:
import jail
APOLLO = True
except:
import jail_faker as jail
APOLLO = False
# This could be a function, not a class, but we want to reuse jail process since
# we can instantiate only 8 of them.
class QueryClient:
def __init__(self, args, cpus=None):
self.log = logging.getLogger("QueryClient")
self.client = jail.get_process()
if cpus:
self.client.set_cpus(cpus)
def __call__(self, queries, database, num_client_workers):
self.log.debug("execute('%s')", str(queries))
client_path = "tests/macro_benchmark/query_client"
client = get_absolute_path(client_path, "build")
if not os.path.exists(client):
# Apollo builds both debug and release binaries on diff
# so we need to use the release client if the debug one
# doesn't exist
client = get_absolute_path(client_path, "build_release")
queries_fd, queries_path = tempfile.mkstemp()
try:
queries_file = os.fdopen(queries_fd, "w")
queries_file.write("\n".join(queries))
queries_file.close()
except:
queries_file.close()
os.remove(queries_path)
raise Exception("Writing queries to temporary file failed")
output_fd, output = tempfile.mkstemp()
os.close(output_fd)
client_args = ["--port", database.args.port,
"--num-workers", str(num_client_workers),
"--output", output]
cpu_time_start = database.database_bin.get_usage()["cpu"]
# TODO make the timeout configurable per query or something
return_code = self.client.run_and_wait(
client, client_args, timeout=600, stdin=queries_path)
cpu_time_end = database.database_bin.get_usage()["cpu"]
os.remove(queries_path)
if return_code != 0:
with open(self.client.get_stderr()) as f:
stderr = f.read()
self.log.error("Error while executing queries '%s'. "
"Failed with return_code %d and stderr:\n%s",
str(queries), return_code, stderr)
raise Exception("BoltClient execution failed")
with open(output) as f:
data = json.loads(f.read())
data[CPU_TIME] = cpu_time_end - cpu_time_start
os.remove(output)
return data
class LongRunningClient:
def __init__(self, args, cpus=None):
self.log = logging.getLogger("LongRunningClient")
self.client = jail.get_process()
if cpus:
self.client.set_cpus(cpus)
# TODO: This is quite similar to __call__ method of QueryClient. Remove
# duplication.
def __call__(self, config, database, duration, num_client_workers):
self.log.debug("execute('%s')", config)
client_path = "tests/macro_benchmark/long_running_client"
client = get_absolute_path(client_path, "build")
if not os.path.exists(client):
# Apollo builds both debug and release binaries on diff
# so we need to use the release client if the debug one
# doesn't exist
client = get_absolute_path(client_path, "build_release")
config_fd, config_path = tempfile.mkstemp()
try:
config_file = os.fdopen(config_fd, "w")
print(json.dumps(config, indent=4), file=config_file)
config_file.close()
except:
config_file.close()
os.remove(config_path)
raise Exception("Writing config to temporary file failed")
output_fd, output = tempfile.mkstemp()
os.close(output_fd)
client_args = ["--port", database.args.port,
"--num-workers", str(num_client_workers),
"--output", output,
"--duration", str(duration)]
return_code = self.client.run_and_wait(
client, client_args, timeout=600, stdin=config_path)
os.remove(config_path)
if return_code != 0:
with open(self.client.get_stderr()) as f:
stderr = f.read()
self.log.error("Error while executing config '%s'. "
"Failed with return_code %d and stderr:\n%s",
str(config), return_code, stderr)
raise Exception("BoltClient execution failed")
# TODO: We shouldn't wait for process to finish to start reading output.
# We should implement periodic reading of data and stream data when it
# becomes available.
data = []
with open(output) as f:
for line in f:
data.append(json.loads(line))
os.remove(output)
return data

View File

@ -1,7 +1,10 @@
#include <experimental/optional>
#include <map>
#include <string>
#include "communication/bolt/client.hpp"
#include "communication/bolt/v1/decoder/decoded_value.hpp"
#include "utils/exceptions.hpp"
namespace {
@ -47,15 +50,17 @@ void PrintJsonDecodedValue(std::ostream &os,
}
}
template <typename ClientT, typename ExceptionT>
template <typename TClient>
communication::bolt::QueryData ExecuteNTimesTillSuccess(
ClientT &client, const std::string &query, int times) {
ExceptionT last_exception;
TClient &client, const std::string &query,
const std::map<std::string, communication::bolt::DecodedValue> &params,
int times) {
std::experimental::optional<utils::BasicException> last_exception;
for (int i = 0; i < times; ++i) {
try {
auto ret = client.Execute(query, {});
auto ret = client.Execute(query, params);
return ret;
} catch (const ExceptionT &e) {
} catch (const utils::BasicException &e) {
last_exception = e;
}
}

View File

@ -0,0 +1,267 @@
// TODO: work in progress.
#include <array>
#include <chrono>
#include <fstream>
#include <iostream>
#include <queue>
#include <random>
#include <sstream>
#include <vector>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <json/json.hpp>
#include "common.hpp"
#include "communication/bolt/client.hpp"
#include "communication/bolt/v1/decoder/decoded_value.hpp"
#include "io/network/network_endpoint.hpp"
#include "io/network/socket.hpp"
#include "threading/sync/spinlock.hpp"
#include "utils/algorithm.hpp"
#include "utils/algorithm.hpp"
#include "utils/assert.hpp"
#include "utils/timer.hpp"
using SocketT = io::network::Socket;
using EndpointT = io::network::NetworkEndpoint;
using Client = communication::bolt::Client<SocketT>;
using communication::bolt::DecodedValue;
using communication::bolt::DecodedVertex;
using communication::bolt::DecodedEdge;
DEFINE_string(address, "127.0.0.1", "Server address");
DEFINE_string(port, "7687", "Server port");
DEFINE_int32(num_workers, 1, "Number of workers");
DEFINE_string(output, "", "Output file");
DEFINE_string(username, "", "Username for the database");
DEFINE_string(password, "", "Password for the database");
DEFINE_int32(duration, 30, "Number of seconds to execute benchmark");
const int MAX_RETRIES = 30;
const int NUM_BUCKETS = 100;
struct VertexAndEdges {
DecodedVertex vertex;
std::vector<DecodedEdge> edges;
std::vector<DecodedVertex> vertices;
};
std::pair<VertexAndEdges, int> DetachDeleteVertex(Client &client,
const std::string &label,
int64_t id) {
auto records =
ExecuteNTimesTillSuccess(
client, "MATCH (n :" + label + " {id : $id})-[e]-(m) RETURN n, e, m",
std::map<std::string, DecodedValue>{{"id", id}}, MAX_RETRIES)
.records;
if (records.size() == 0U) return {{}, 1};
ExecuteNTimesTillSuccess(
client, "MATCH (n :" + label + " {id : $id})-[]-(m) DETACH DELETE n",
std::map<std::string, DecodedValue>{{"id", id}}, MAX_RETRIES);
std::vector<DecodedEdge> edges;
edges.reserve(records.size());
for (const auto &record : records) {
edges.push_back(record[1].ValueEdge());
}
std::vector<DecodedVertex> vertices;
vertices.reserve(records.size());
for (const auto &record : records) {
vertices.push_back(record[2].ValueVertex());
}
return {{records[0][0].ValueVertex(), edges, vertices}, 2};
}
int ReturnVertexAndEdges(Client &client, const VertexAndEdges &vertex_and_edges,
const std::string &independent_label) {
int num_queries = 0;
{
std::stringstream os;
os << "CREATE (n :";
PrintIterable(os, vertex_and_edges.vertex.labels, ":");
os << " {";
PrintIterable(os, vertex_and_edges.vertex.properties, ", ",
[&](auto &stream, const auto &pair) {
if (pair.second.type() == DecodedValue::Type::String) {
stream << pair.first << ": \"" << pair.second << "\"";
} else {
stream << pair.first << ": " << pair.second;
}
});
os << "})";
ExecuteNTimesTillSuccess(client, os.str(), {}, MAX_RETRIES);
++num_queries;
}
for (int i = 0; i < static_cast<int>(vertex_and_edges.vertices.size()); ++i) {
std::stringstream os;
os << "MATCH (n :" << independent_label
<< " {id: " << vertex_and_edges.vertex.properties.at("id") << "}) ";
os << "MATCH (m :" << independent_label
<< " {id: " << vertex_and_edges.vertices[i].properties.at("id") << "}) ";
const auto &edge = vertex_and_edges.edges[i];
os << "CREATE (n)";
if (edge.to == vertex_and_edges.vertex.id) {
os << "<-";
} else {
os << "-";
}
os << "[:" << edge.type << " {";
PrintIterable(os, edge.properties, ", ",
[&](auto &stream, const auto &pair) {
if (pair.second.type() == DecodedValue::Type::String) {
stream << pair.first << ": \"" << pair.second << "\"";
} else {
stream << pair.first << ": " << pair.second;
}
});
os << "}]";
if (edge.from == vertex_and_edges.vertex.id) {
os << "->";
} else {
os << "-";
}
os << "(m)";
os << " RETURN n.id";
auto ret = ExecuteNTimesTillSuccess(client, os.str(), {}, MAX_RETRIES);
auto x = ret.metadata["plan_execution_time"];
auto y = ret.metadata["planning_time"];
if (x.type() == DecodedValue::Type::Double) {
LOG_EVERY_N(INFO, 5000) << "exec " << x.ValueDouble() << " planning "
<< y.ValueDouble();
CHECK(ret.records.size() == 1U) << "Graph in invalid state";
}
++num_queries;
}
return num_queries;
}
int main(int argc, char **argv) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
google::InitGoogleLogging(argv[0]);
nlohmann::json config;
std::cin >> config;
const auto &queries = config["queries"];
const double read_probability = config["read_probability"];
const int64_t num_independent_nodes = config["num_independent_nodes"];
const std::string independent_label = config["independent_label"];
const int64_t num_nodes = config["num_nodes"];
utils::Timer timer;
std::vector<std::thread> threads;
std::atomic<int64_t> executed_queries{0};
std::atomic<bool> keep_running{true};
for (int i = 0; i < FLAGS_num_workers; ++i) {
threads.emplace_back(
[&](int thread_id) {
// Initialise client.
SocketT socket;
EndpointT endpoint;
try {
endpoint = EndpointT(FLAGS_address, FLAGS_port);
} catch (const io::network::NetworkEndpointException &e) {
LOG(FATAL) << "Invalid address or port: " << FLAGS_address << ":"
<< FLAGS_port;
}
if (!socket.Connect(endpoint)) {
LOG(FATAL) << "Could not connect to: " << FLAGS_address << ":"
<< FLAGS_port;
}
Client client(std::move(socket), FLAGS_username, FLAGS_password);
std::mt19937 random_gen(thread_id);
int64_t to_remove =
num_independent_nodes / FLAGS_num_workers * thread_id + 1;
int64_t last_to_remove =
to_remove + num_independent_nodes / FLAGS_num_workers;
bool remove = true;
int64_t num_shifts = 0;
std::vector<VertexAndEdges> removed;
while (keep_running) {
std::uniform_real_distribution<> real_dist(0.0, 1.0);
// Read query.
if (real_dist(random_gen) < read_probability) {
std::uniform_int_distribution<> read_query_dist(
0, static_cast<int>(queries.size()) - 1);
const auto &query = queries[read_query_dist(random_gen)];
std::map<std::string, DecodedValue> params;
for (const auto &param : query["params"]) {
std::uniform_int_distribution<int64_t> param_value_dist(
param["low"], param["high"]);
params[param["name"]] = param_value_dist(random_gen);
}
ExecuteNTimesTillSuccess(client, query["query"], params,
MAX_RETRIES);
++executed_queries;
} else {
if (!remove) {
executed_queries += ReturnVertexAndEdges(client, removed.back(),
independent_label);
removed.pop_back();
if (removed.empty()) {
remove = true;
}
} else {
auto ret =
DetachDeleteVertex(client, independent_label, to_remove);
++to_remove;
executed_queries += ret.second;
if (ret.second > 1) {
removed.push_back(std::move(ret.first));
}
if (to_remove == last_to_remove) {
for (auto &x : removed) {
x.vertex.properties["id"].ValueInt() += num_nodes;
}
remove = false;
++num_shifts;
to_remove =
num_independent_nodes / FLAGS_num_workers * thread_id +
1 + num_shifts * num_nodes;
last_to_remove =
to_remove + num_independent_nodes / FLAGS_num_workers;
}
}
}
}
client.Close();
},
i);
}
// Open stream for writing stats.
std::streambuf *buf;
std::ofstream f;
if (FLAGS_output != "") {
f.open(FLAGS_output);
buf = f.rdbuf();
} else {
buf = std::cout.rdbuf();
}
std::ostream out(buf);
while (timer.Elapsed().count() < FLAGS_duration) {
using namespace std::chrono_literals;
out << "{ \"num_executed_queries\": " << executed_queries << ", "
<< "\"elapsed_time\": " << timer.Elapsed().count() << "}" << std::endl;
out.flush();
std::this_thread::sleep_for(1s);
}
keep_running = false;
for (int i = 0; i < FLAGS_num_workers; ++i) {
threads[i].join();
}
return 0;
}

View File

@ -6,7 +6,6 @@
#include <fmt/format.h>
#include <glog/logging.h>
#include <libpq-fe.h>
#include "communication/bolt/client.hpp"
@ -49,10 +48,13 @@ class Client {
}
}
QueryData Execute(const std::string &query,
const std::map<std::string, std::string> &parameters) {
QueryData Execute(
const std::string &query,
const std::map<std::string, communication::bolt::DecodedValue>
&parameters) {
QueryData ret;
CHECK(parameters.size() == 0U) << "Parameters not yet supported";
DLOG(INFO) << "Sending run message with statement: '" << query << "'";
result_ = PQexec(connection_, query.c_str());

View File

@ -4,6 +4,7 @@
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "communication/bolt/client.hpp"
#include "communication/bolt/v1/decoder/decoded_value.hpp"
#include "threading/sync/spinlock.hpp"
#include "utils/algorithm.hpp"
@ -11,7 +12,7 @@
#include "bolt_client.hpp"
#include "common.hpp"
#include "postgres_client.hpp"
//#include "postgres_client.hpp"
DEFINE_string(protocol, "bolt", "Protocol to use (available: bolt, postgres)");
DEFINE_int32(num_workers, 1, "Number of workers");
@ -83,9 +84,8 @@ void ExecuteQueries(std::istream &istream, int num_workers,
str = queries[pos];
}
try {
metadata[pos] = ExecuteNTimesTillSuccess<ClientT, ExceptionT>(
client, str, MAX_RETRIES)
.metadata;
metadata[pos] =
ExecuteNTimesTillSuccess(client, str, {}, MAX_RETRIES).metadata;
} catch (const ExceptionT &e) {
LOG(FATAL) << "Could not execute query '" << str << "' "
<< MAX_RETRIES << " times! Error message: " << e.what();
@ -105,12 +105,6 @@ void ExecuteQueries(std::istream &istream, int num_workers,
PrintSummary(ostream, duration, metadata);
}
using BoltClientT = BoltClient;
using BoltExceptionT = communication::bolt::ClientQueryException;
using PostgresClientT = postgres::Client;
using PostgresExceptionT = postgres::ClientQueryException;
int main(int argc, char **argv) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
google::InitGoogleLogging(argv[0]);
@ -134,18 +128,27 @@ int main(int argc, char **argv) {
std::string port = FLAGS_port;
if (FLAGS_protocol == "bolt") {
if (port == "") port = "7687";
using BoltClientT = BoltClient;
using BoltExceptionT = communication::bolt::ClientQueryException;
ExecuteQueries<BoltClientT, BoltExceptionT>(
*istream, FLAGS_num_workers, *ostream, FLAGS_address, port,
FLAGS_username, FLAGS_password, FLAGS_database);
} else if (FLAGS_protocol == "postgres") {
permanent_assert(FLAGS_username != "",
"Username can't be empty for postgres!");
permanent_assert(FLAGS_database != "",
"Database can't be empty for postgres!");
if (port == "") port = "5432";
ExecuteQueries<PostgresClientT, PostgresExceptionT>(
*istream, FLAGS_num_workers, *ostream, FLAGS_address, port,
FLAGS_username, FLAGS_password, FLAGS_database);
LOG(FATAL) << "Postgres not yet supported";
// TODO: Currently libpq is linked dynamically so it is a pain to move
// harness_client executable to other machines without libpq.
// CHECK(FLAGS_username != "") << "Username can't be empty for
// postgres!";
// CHECK(FLAGS_database != "") << "Database can't be empty for
// postgres!";
// if (port == "") port = "5432";
//
// using PostgresClientT = postgres::Client;
// using PostgresExceptionT = postgres::ClientQueryException;
// ExecuteQueries<PostgresClientT, PostgresExceptionT>(
// *istream, FLAGS_num_workers, *ostream, FLAGS_address, port,
// FLAGS_username, FLAGS_password, FLAGS_database);
}
return 0;

View File

@ -0,0 +1,19 @@
import os
WALL_TIME = "wall_time"
CPU_TIME = "cpu_time"
DIR_PATH = os.path.dirname(os.path.realpath(__file__))
def get_absolute_path(path, base=""):
if base == "build":
extra = "../../../build"
elif base == "build_release":
extra = "../../../build_release"
elif base == "libs":
extra = "../../../libs"
elif base == "config":
extra = "../../../config"
else:
extra = ""
return os.path.normpath(os.path.join(DIR_PATH, extra, path))

View File

@ -322,4 +322,3 @@ dbms.udc.enabled=false
# Disable query cache
dbms.query_cache_size=0

View File

@ -0,0 +1,324 @@
#*****************************************************************
# Neo4j configuration
#
# For more details and a complete list of settings, please see
# https://neo4j.com/docs/operations-manual/current/reference/configuration-settings/
#*****************************************************************
# The name of the database to mount
#dbms.active_database=graph.db
# Paths of directories in the installation.
#dbms.directories.data=/var/lib/neo4j/data
#dbms.directories.plugins=/var/lib/neo4j/plugins
#dbms.directories.certificates=/var/lib/neo4j/certificates
#dbms.directories.logs=/var/log/neo4j
#dbms.directories.lib=/usr/share/neo4j/lib
#dbms.directories.run=/var/run/neo4j
# This setting constrains all `LOAD CSV` import files to be under the `import` directory. Remove or comment it out to
# allow files to be loaded from anywhere in the filesystem; this introduces possible security problems. See the
# `LOAD CSV` section of the manual for details.
#dbms.directories.import=/var/lib/neo4j/import
# Whether requests to Neo4j are authenticated.
# To disable authentication, uncomment this line
dbms.security.auth_enabled=false
# Enable this to be able to upgrade a store from an older version.
#dbms.allow_format_migration=true
# Java Heap Size: by default the Java heap size is dynamically
# calculated based on available system resources.
# Uncomment these lines to set specific initial and maximum
# heap size.
#dbms.memory.heap.initial_size=512m
#dbms.memory.heap.max_size=512m
# The amount of memory to use for mapping the store files, in bytes (or
# kilobytes with the 'k' suffix, megabytes with 'm' and gigabytes with 'g').
# If Neo4j is running on a dedicated server, then it is generally recommended
# to leave about 2-4 gigabytes for the operating system, give the JVM enough
# heap to hold all your transaction state and query context, and then leave the
# rest for the page cache.
# The default page cache memory assumes the machine is dedicated to running
# Neo4j, and is heuristically set to 50% of RAM minus the max Java heap size.
#dbms.memory.pagecache.size=10g
#*****************************************************************
# Network connector configuration
#*****************************************************************
# With default configuration Neo4j only accepts local connections.
# To accept non-local connections, uncomment this line:
#dbms.connectors.default_listen_address=0.0.0.0
# You can also choose a specific network interface, and configure a non-default
# port for each connector, by setting their individual listen_address.
# The address at which this server can be reached by its clients. This may be the server's IP address or DNS name, or
# it may be the address of a reverse proxy which sits in front of the server. This setting may be overridden for
# individual connectors below.
#dbms.connectors.default_advertised_address=localhost
# You can also choose a specific advertised hostname or IP address, and
# configure an advertised port for each connector, by setting their
# individual advertised_address.
# Bolt connector
dbms.connector.bolt.enabled=true
#dbms.connector.bolt.tls_level=OPTIONAL
#dbms.connector.bolt.listen_address=:7687
# HTTP Connector. There must be exactly one HTTP connector.
dbms.connector.http.enabled=true
#dbms.connector.http.listen_address=:7474
# HTTPS Connector. There can be zero or one HTTPS connectors.
dbms.connector.https.enabled=false
#dbms.connector.https.listen_address=:7473
# Number of Neo4j worker threads.
#dbms.threads.worker_count=
#*****************************************************************
# SSL system configuration
#*****************************************************************
# Names of the SSL policies to be used for the respective components.
# The legacy policy is a special policy which is not defined in
# the policy configuration section, but rather derives from
# dbms.directories.certificates and associated files
# (by default: neo4j.key and neo4j.cert). Its use will be deprecated.
# The policies to be used for connectors.
#
# N.B: Note that a connector must be configured to support/require
# SSL/TLS for the policy to actually be utilized.
#
# see: dbms.connector.*.tls_level
#bolt.ssl_policy=legacy
#https.ssl_policy=legacy
#*****************************************************************
# SSL policy configuration
#*****************************************************************
# Each policy is configured under a separate namespace, e.g.
# dbms.ssl.policy.<policyname>.*
#
# The example settings below are for a new policy named 'default'.
# The base directory for cryptographic objects. Each policy will by
# default look for its associated objects (keys, certificates, ...)
# under the base directory.
#
# Every such setting can be overriden using a full path to
# the respective object, but every policy will by default look
# for cryptographic objects in its base location.
#
# Mandatory setting
#dbms.ssl.policy.default.base_directory=certificates/default
# Allows the generation of a fresh private key and a self-signed
# certificate if none are found in the expected locations. It is
# recommended to turn this off again after keys have been generated.
#
# Keys should in general be generated and distributed offline
# by a trusted certificate authority (CA) and not by utilizing
# this mode.
#dbms.ssl.policy.default.allow_key_generation=false
# Enabling this makes it so that this policy ignores the contents
# of the trusted_dir and simply resorts to trusting everything.
#
# Use of this mode is discouraged. It would offer encryption but no security.
#dbms.ssl.policy.default.trust_all=false
# The private key for the default SSL policy. By default a file
# named private.key is expected under the base directory of the policy.
# It is mandatory that a key can be found or generated.
#dbms.ssl.policy.default.private_key=
# The private key for the default SSL policy. By default a file
# named public.crt is expected under the base directory of the policy.
# It is mandatory that a certificate can be found or generated.
#dbms.ssl.policy.default.public_certificate=
# The certificates of trusted parties. By default a directory named
# 'trusted' is expected under the base directory of the policy. It is
# mandatory to create the directory so that it exists, because it cannot
# be auto-created (for security purposes).
#
# To enforce client authentication client_auth must be set to 'require'!
#dbms.ssl.policy.default.trusted_dir=
# Client authentication setting. Values: none, optional, require
# The default is to require client authentication.
#
# Servers are always authenticated unless explicitly overridden
# using the trust_all setting. In a mutual authentication setup this
# should be kept at the default of require and trusted certificates
# must be installed in the trusted_dir.
#dbms.ssl.policy.default.client_auth=require
# A comma-separated list of allowed TLS versions.
# By default TLSv1, TLSv1.1 and TLSv1.2 are allowed.
#dbms.ssl.policy.default.tls_versions=
# A comma-separated list of allowed ciphers.
# The default ciphers are the defaults of the JVM platform.
#dbms.ssl.policy.default.ciphers=
#*****************************************************************
# Logging configuration
#*****************************************************************
# To enable HTTP logging, uncomment this line
#dbms.logs.http.enabled=true
# Number of HTTP logs to keep.
#dbms.logs.http.rotation.keep_number=5
# Size of each HTTP log that is kept.
#dbms.logs.http.rotation.size=20m
# To enable GC Logging, uncomment this line
#dbms.logs.gc.enabled=true
# GC Logging Options
# see http://docs.oracle.com/cd/E19957-01/819-0084-10/pt_tuningjava.html#wp57013 for more information.
#dbms.logs.gc.options=-XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintPromotionFailure -XX:+PrintTenuringDistribution
# Number of GC logs to keep.
#dbms.logs.gc.rotation.keep_number=5
# Size of each GC log that is kept.
#dbms.logs.gc.rotation.size=20m
# Size threshold for rotation of the debug log. If set to zero then no rotation will occur. Accepts a binary suffix "k",
# "m" or "g".
#dbms.logs.debug.rotation.size=20m
# Maximum number of history files for the internal log.
#dbms.logs.debug.rotation.keep_number=7
#*****************************************************************
# Miscellaneous configuration
#*****************************************************************
# Enable this to specify a parser other than the default one.
#cypher.default_language_version=3.0
# Determines if Cypher will allow using file URLs when loading data using
# `LOAD CSV`. Setting this value to `false` will cause Neo4j to fail `LOAD CSV`
# clauses that load data from the file system.
#dbms.security.allow_csv_import_from_file_urls=true
# Retention policy for transaction logs needed to perform recovery and backups.
dbms.tx_log.rotation.retention_policy=1 days
# Enable a remote shell server which Neo4j Shell clients can log in to.
#dbms.shell.enabled=true
# The network interface IP the shell will listen on (use 0.0.0.0 for all interfaces).
#dbms.shell.host=127.0.0.1
# The port the shell will listen on, default is 1337.
#dbms.shell.port=1337
# Only allow read operations from this Neo4j instance. This mode still requires
# write access to the directory for lock purposes.
#dbms.read_only=false
# Comma separated list of JAX-RS packages containing JAX-RS resources, one
# package name for each mountpoint. The listed package names will be loaded
# under the mountpoints specified. Uncomment this line to mount the
# org.neo4j.examples.server.unmanaged.HelloWorldResource.java from
# neo4j-server-examples under /examples/unmanaged, resulting in a final URL of
# http://localhost:7474/examples/unmanaged/helloworld/{nodeId}
#dbms.unmanaged_extension_classes=org.neo4j.examples.server.unmanaged=/examples/unmanaged
#********************************************************************
# JVM Parameters
#********************************************************************
# G1GC generally strikes a good balance between throughput and tail
# latency, without too much tuning.
dbms.jvm.additional=-XX:+UseG1GC
# Have common exceptions keep producing stack traces, so they can be
# debugged regardless of how often logs are rotated.
dbms.jvm.additional=-XX:-OmitStackTraceInFastThrow
# Make sure that `initmemory` is not only allocated, but committed to
# the process, before starting the database. This reduces memory
# fragmentation, increasing the effectiveness of transparent huge
# pages. It also reduces the possibility of seeing performance drop
# due to heap-growing GC events, where a decrease in available page
# cache leads to an increase in mean IO response time.
# Try reducing the heap memory, if this flag degrades performance.
dbms.jvm.additional=-XX:+AlwaysPreTouch
# Trust that non-static final fields are really final.
# This allows more optimizations and improves overall performance.
# NOTE: Disable this if you use embedded mode, or have extensions or dependencies that may use reflection or
# serialization to change the value of final fields!
dbms.jvm.additional=-XX:+UnlockExperimentalVMOptions
dbms.jvm.additional=-XX:+TrustFinalNonStaticFields
# Disable explicit garbage collection, which is occasionally invoked by the JDK itself.
dbms.jvm.additional=-XX:+DisableExplicitGC
# Remote JMX monitoring, uncomment and adjust the following lines as needed. Absolute paths to jmx.access and
# jmx.password files are required.
# Also make sure to update the jmx.access and jmx.password files with appropriate permission roles and passwords,
# the shipped configuration contains only a read only role called 'monitor' with password 'Neo4j'.
# For more details, see: http://download.oracle.com/javase/8/docs/technotes/guides/management/agent.html
# On Unix based systems the jmx.password file needs to be owned by the user that will run the server,
# and have permissions set to 0600.
# For details on setting these file permissions on Windows see:
# http://docs.oracle.com/javase/8/docs/technotes/guides/management/security-windows.html
#dbms.jvm.additional=-Dcom.sun.management.jmxremote.port=3637
#dbms.jvm.additional=-Dcom.sun.management.jmxremote.authenticate=true
#dbms.jvm.additional=-Dcom.sun.management.jmxremote.ssl=false
#dbms.jvm.additional=-Dcom.sun.management.jmxremote.password.file=/absolute/path/to/conf/jmx.password
#dbms.jvm.additional=-Dcom.sun.management.jmxremote.access.file=/absolute/path/to/conf/jmx.access
# Some systems cannot discover host name automatically, and need this line configured:
#dbms.jvm.additional=-Djava.rmi.server.hostname=$THE_NEO4J_SERVER_HOSTNAME
# Expand Diffie Hellman (DH) key size from default 1024 to 2048 for DH-RSA cipher suites used in server TLS handshakes.
# This is to protect the server from any potential passive eavesdropping.
dbms.jvm.additional=-Djdk.tls.ephemeralDHKeySize=2048
#********************************************************************
# Wrapper Windows NT/2000/XP Service Properties
#********************************************************************
# WARNING - Do not modify any of these properties when an application
# using this configuration file has been installed as a service.
# Please uninstall the service before modifying this section. The
# service can then be reinstalled.
# Name of the service
dbms.windows_service_name=neo4j
#********************************************************************
# Other Neo4j system properties
#********************************************************************
dbms.jvm.additional=-Dunsupported.dbms.udc.source=debian
# Disable Neo4j usage data collection
dbms.udc.enabled=false
# Disable query cache
dbms.query_cache_size=1000

View File

@ -0,0 +1,169 @@
import logging
import os
import subprocess
from argparse import ArgumentParser
from collections import defaultdict
import tempfile
import shutil
import time
from common import get_absolute_path
try:
import jail
APOLLO = True
except:
import jail_faker as jail
APOLLO = False
def wait_for_server(port, delay=1.0):
cmd = ["nc", "-z", "-w", "1", "127.0.0.1", port]
while subprocess.call(cmd) != 0:
time.sleep(0.5)
time.sleep(delay)
class Memgraph:
"""
Knows how to start and stop memgraph.
"""
def __init__(self, args, config, num_workers, cpus=None):
self.log = logging.getLogger("MemgraphRunner")
argp = ArgumentParser("MemgraphArgumentParser")
argp.add_argument("--runner-bin",
default=get_absolute_path("memgraph", "build"))
argp.add_argument("--port", default="7687",
help="Database and client port")
self.log.info("Initializing Runner with arguments %r", args)
self.args, _ = argp.parse_known_args(args)
self.config = config
self.num_workers = num_workers
self.database_bin = jail.get_process()
if cpus:
self.database_bin.set_cpus(cpus)
def start(self):
self.log.info("start")
env = {"MEMGRAPH_CONFIG": self.config}
database_args = ["--port", self.args.port]
if self.num_workers:
database_args += ["--num_workers", self.num_workers]
# find executable path
runner_bin = self.args.runner_bin
if not os.path.exists(runner_bin):
# Apollo builds both debug and release binaries on diff
# so we need to use the release binary if the debug one
# doesn't exist
runner_bin = get_absolute_path("memgraph", "build_release")
# start memgraph
self.database_bin.run(runner_bin, database_args, env=env, timeout=600)
wait_for_server(self.args.port)
def stop(self):
self.database_bin.send_signal(jail.SIGTERM)
self.database_bin.wait()
class Neo:
"""
Knows how to start and stop neo4j.
"""
def __init__(self, args, config, cpus=None):
self.log = logging.getLogger("NeoRunner")
argp = ArgumentParser("NeoArgumentParser")
argp.add_argument("--runner-bin", default=get_absolute_path(
"neo4j/bin/neo4j", "libs"))
argp.add_argument("--port", default="7687",
help="Database and client port")
argp.add_argument("--http-port", default="7474",
help="Database and client port")
self.log.info("Initializing Runner with arguments %r", args)
self.args, _ = argp.parse_known_args(args)
self.config = config
self.database_bin = jail.get_process()
if cpus:
self.database_bin.set_cpus(cpus)
def start(self):
self.log.info("start")
# create home directory
self.neo4j_home_path = tempfile.mkdtemp(dir="/dev/shm")
try:
os.symlink(os.path.join(get_absolute_path("neo4j", "libs"), "lib"),
os.path.join(self.neo4j_home_path, "lib"))
neo4j_conf_dir = os.path.join(self.neo4j_home_path, "conf")
neo4j_conf_file = os.path.join(neo4j_conf_dir, "neo4j.conf")
os.mkdir(neo4j_conf_dir)
shutil.copyfile(self.config, neo4j_conf_file)
with open(neo4j_conf_file, "a") as f:
f.write("\ndbms.connector.bolt.listen_address=:" +
self.args.port + "\n")
f.write("\ndbms.connector.http.listen_address=:" +
self.args.http_port + "\n")
# environment
cwd = os.path.dirname(self.args.runner_bin)
env = {"NEO4J_HOME": self.neo4j_home_path}
self.database_bin.run(self.args.runner_bin, args=["console"],
env=env, timeout=600, cwd=cwd)
except:
shutil.rmtree(self.neo4j_home_path)
raise Exception("Couldn't run Neo4j!")
wait_for_server(self.args.http_port, 2.0)
def stop(self):
self.database_bin.send_signal(jail.SIGTERM)
self.database_bin.wait()
if os.path.exists(self.neo4j_home_path):
shutil.rmtree(self.neo4j_home_path)
class Postgres:
"""
Knows how to start and stop PostgreSQL.
"""
def __init__(self, args, cpus):
self.log = logging.getLogger("PostgresRunner")
argp = ArgumentParser("PostgresArgumentParser")
argp.add_argument("--init-bin", default=get_absolute_path(
"postgresql/bin/initdb", "libs"))
argp.add_argument("--runner-bin", default=get_absolute_path(
"postgresql/bin/postgres", "libs"))
argp.add_argument("--port", default="5432",
help="Database and client port")
self.log.info("Initializing Runner with arguments %r", args)
self.args, _ = argp.parse_known_args(args)
self.username = "macro_benchmark"
self.database_bin = jail.get_process()
self.database_bin.set_cpus(cpus)
def start(self):
self.log.info("start")
self.data_path = tempfile.mkdtemp(dir="/dev/shm")
init_args = ["-D", self.data_path, "-U", self.username]
self.database_bin.run_and_wait(self.args.init_bin, init_args)
# args
runner_args = ["-D", self.data_path, "-c", "port=" + self.args.port,
"-c", "ssl=false", "-c", "max_worker_processes=1"]
try:
self.database_bin.run(self.args.runner_bin, args=runner_args,
timeout=600)
except:
shutil.rmtree(self.data_path)
raise Exception("Couldn't run PostgreSQL!")
wait_for_server(self.args.port)
def stop(self):
self.database_bin.send_signal(jail.SIGTERM)
self.database_bin.wait()
if os.path.exists(self.data_path):
shutil.rmtree(self.data_path)

View File

@ -0,0 +1 @@
pokec_small.setup.cypher

View File

@ -0,0 +1,3 @@
{
"duration": 30
}

View File

@ -0,0 +1,28 @@
{
"num_independent_nodes" : 4111,
"num_nodes" : 10000,
"independent_label": "User",
"read_probability": 0.5,
"queries" : [
{
"query": "MATCH (n :User {id : $id})-[]-(m) RETURN AVG(n.age + m.age)",
"params" : [
{
"name" : "id",
"low" : 1,
"high" : 10000
}
]
},
{
"query": "MATCH (n :User {id : $id})-[]-(m)-[]-(k) RETURN AVG(n.age + m.age + k.age)",
"params" : [
{
"name" : "id",
"low" : 1,
"high" : 10000
}
]
}
]
}

View File

@ -9,12 +9,11 @@ import json
import subprocess
from argparse import ArgumentParser
from collections import OrderedDict
from collections import defaultdict
import tempfile
import shutil
from statistics import median
from common import get_absolute_path
from query_suite import QuerySuite, QueryParallelSuite
from long_running_suite import LongRunningSuite
from perf import Perf
log = logging.getLogger(__name__)
try:
import jail
@ -23,35 +22,52 @@ except:
import jail_faker as jail
APOLLO = False
DIR_PATH = os.path.dirname(os.path.realpath(__file__))
WALL_TIME = "wall_time"
CPU_TIME = "cpu_time"
log = logging.getLogger(__name__)
class Loader:
"""
Loads file contents. Supported types are:
.py - executable that prints out Cypher queries
.cypher - contains Cypher queries in textual form
.json - contains a configuration
A QueryLoader object is callable.
A call to it returns a generator that yields loaded data
(Cypher queries, configuration). In that sense one
QueryLoader is reusable. The generator approach makes it possible
to generated different queries each time when executing a .py file.
"""
def __init__(self, file_path):
self.file_path = file_path
def _queries(self, data):
""" Helper function for breaking down and filtering queries"""
for element in filter(lambda x: x is not None,
map(str.strip, data.replace("\n", " ").split(";"))):
yield element
def __call__(self):
""" Yields queries found in the given file_path one by one """
log.debug("Generating queries from file_path: %s",
self.file_path)
_, extension = os.path.splitext(self.file_path)
if extension == ".cypher":
with open(self.file_path) as f:
return self._queries(f.read())
elif extension == ".py":
return self._queries(subprocess.check_output(
["python3", self.file_path]).decode("ascii"))
elif extension == ".json":
with open(self.file_path) as f:
return [json.load(f)].__iter__()
else:
raise Exception("Unsupported filetype {} ".format(extension))
def __repr__(self):
return "(Loader<%s>)" % self.file_path
def get_absolute_path(path, base=""):
if base == "build":
extra = "../../../build"
elif base == "build_release":
extra = "../../../build_release"
elif base == "libs":
extra = "../../../libs"
elif base == "config":
extra = "../../../config"
else:
extra = ""
return os.path.normpath(os.path.join(DIR_PATH, extra, path))
def wait_for_server(port, delay=1.0):
cmd = ["nc", "-z", "-w", "1", "127.0.0.1", port]
while subprocess.call(cmd) != 0:
time.sleep(0.5)
time.sleep(delay)
def load_scenarios(args):
def load_scenarios(args, known_keys, suite_groups):
"""
Scans through folder structure starting with groups_root and
loads query scenarios.
@ -98,12 +114,11 @@ def load_scenarios(args):
for config_file in config_files:
log.debug("Processing config file %s", config_file)
config_name = config_file.split(".")[-2]
config_dict[config_name] = QuerySuite.Loader(
os.path.join(base, config_file))
config_dict[config_name] = Loader(os.path.join(base, config_file))
# validate that the scenario does not contain any illegal
# keys (defense against typos in file naming)
unknown_keys = set(config_dict) - QuerySuite.KNOWN_KEYS
# Validate that the scenario does not contain any illegal keys (defense
# against typos in file naming).
unknown_keys = set(config_dict) - known_keys
if unknown_keys:
raise Exception("Unknown QuerySuite config elements: '%r'" %
unknown_keys)
@ -114,18 +129,20 @@ def load_scenarios(args):
group_scenarios = OrderedDict()
for group in dir_content(args.root, os.path.isdir):
if group not in suite_groups: continue
log.info("Loading group: '%s'", group)
group_scenarios[group] = []
# Filter out hidden files: .gitignore, ...
files = dir_content(os.path.join(args.root, group),
os.path.isfile)
lambda x: os.path.isfile(x) and os.path.basename(x)[0] != ".")
# process group default config
# Process group default config.
group_config = {}
fill_config_dict(group_config, os.path.join(args.root, group),
[f for f in files if f.count(".") == 1])
# group files on scenario
# Group files on scenario.
for scenario_name, scenario_files in itertools.groupby(
filter(lambda f: f.count(".") == 2, sorted(files)),
lambda x: x.split(".")[0]):
@ -141,436 +158,6 @@ def load_scenarios(args):
return group_scenarios
class _QuerySuite:
"""
Executes a Query-based benchmark scenario. Query-based scenarios
consist of setup steps (Cypher queries) executed before the benchmark,
a single Cypher query that is benchmarked, and teardown steps
(Cypher queries) executed after the benchmark.
"""
# what the QuerySuite can work with
KNOWN_KEYS = {"config", "setup", "itersetup", "run", "iterteardown",
"teardown", "common"}
FORMAT = ["{:>24}", "{:>28}", "{:>16}", "{:>18}", "{:>22}",
"{:>16}", "{:>16}"]
FULL_FORMAT = "".join(FORMAT) + "\n"
summary = FULL_FORMAT.format(
"group_name", "scenario_name", "parsing_time",
"planning_time", "plan_execution_time",
WALL_TIME, CPU_TIME)
def __init__(self, args):
if not APOLLO:
self.perf = Perf()
argp = ArgumentParser(description=__doc__)
argp.add_argument("--perf", help="Run perf on memgraph binary.",
action="store_true")
args, _ = argp.parse_known_args(args)
self.perf = Perf() if args.perf else None
class Loader:
"""
Loads file contents. Supported types are:
.py - executable that prints out Cypher queries
.cypher - contains Cypher queries in textual form
.json - contains a configuration
A QueryLoader object is callable.
A call to it returns a generator that yields loaded data
(Cypher queries, configuration). In that sense one
QueryLoader is reusable. The generator approach makes it possible
to generated different queries each time when executing a .py file.
"""
def __init__(self, file_path):
self.file_path = file_path
def _queries(self, data):
""" Helper function for breaking down and filtering queries"""
for element in filter(
None, map(str.strip, data.replace("\n", " ").split(";"))):
yield element
def __call__(self):
""" Yields queries found in the given file_path one by one """
log.debug("Generating queries from file_path: %s",
self.file_path)
_, extension = os.path.splitext(self.file_path)
if extension == ".cypher":
with open(self.file_path) as f:
return self._queries(f.read())
elif extension == ".py":
return self._queries(subprocess.check_output(
["python3", self.file_path]).decode("ascii"))
elif extension == ".json":
with open(self.file_path) as f:
return [json.load(f)].__iter__()
else:
raise Exception("Unsupported filetype {} ".format(extension))
def __repr__(self):
return "(QuerySuite.Loader<%s>)" % self.file_path
def run(self, scenario, group_name, scenario_name, runner):
log.debug("QuerySuite.run() with scenario: %s", scenario)
scenario_config = scenario.get("config")
scenario_config = next(scenario_config()) if scenario_config else {}
def execute(config_name, num_client_workers=1):
queries = scenario.get(config_name)
start_time = time.time()
if queries:
r_val = runner.execute(queries(), num_client_workers)
else:
r_val = None
log.info("\t%s done in %.2f seconds" % (config_name,
time.time() - start_time))
return r_val
def add_measurement(dictionary, iteration, key):
if key in dictionary:
measurement = {"target": key,
"value": float(dictionary[key]),
"unit": "s",
"type": "time",
"iteration": iteration}
measurements.append(measurement)
try:
measurement_lists[key].append(float(dictionary[key]))
except:
pass
measurements = []
measurement_lists = defaultdict(list)
# Run the whole test 3 times because memgraph is sometimes
# consistently slow and with this hack we get a good median
for i in range(3):
pid = runner.start()
execute("setup")
# warmup phase
for _ in range(min(scenario_config.get("iterations", 1),
scenario_config.get("warmup", 2))):
execute("itersetup")
execute("run", scenario_config.get("num_client_workers", 1))
execute("iterteardown")
if self.perf:
self.perf.start(pid)
# TODO per scenario/run runner configuration
num_iterations = scenario_config.get("iterations", 1)
for iteration in range(num_iterations):
# TODO if we didn't have the itersetup it would be trivial
# to move iteration to the bolt_client script, so we would not
# have to start and stop the client for each iteration, it would
# most likely run faster
execute("itersetup")
run_result = execute("run",
scenario_config.get("num_client_workers", 1))
add_measurement(run_result, iteration, WALL_TIME)
add_measurement(run_result, iteration, CPU_TIME)
for measurement in ["parsing_time",
"plan_execution_time",
"planning_time"] :
for i in range(len(run_result.get("metadatas", []))):
add_measurement(run_result["metadatas"][i], iteration,
measurement)
execute("iterteardown")
if self.perf:
self.perf.stop()
# TODO value outlier detection and warning across iterations
execute("teardown")
runner.stop()
self.append_scenario_summary(group_name, scenario_name,
measurement_lists, num_iterations)
return measurements
def append_scenario_summary(self, group_name, scenario_name,
measurement_lists, num_iterations):
self.summary += self.FORMAT[0].format(group_name)
self.summary += self.FORMAT[1].format(scenario_name)
for i, key in enumerate(("parsing_time", "planning_time",
"plan_execution_time", WALL_TIME, CPU_TIME)):
if key not in measurement_lists:
time = "-"
else:
# Median is used instead of avg to avoid effect of outliers.
time = "{:.10f}".format(median(measurement_lists[key]))
self.summary += self.FORMAT[i + 2].format(time)
self.summary += "\n"
def runners(self):
""" Which runners can execute a QuerySuite scenario """
assert False, "This is a base class, use one of derived suites"
def groups(self):
""" Which groups can be executed by a QuerySuite scenario """
assert False, "This is a base class, use one of derived suites"
class QuerySuite(_QuerySuite):
def __init__(self, args):
_QuerySuite.__init__(self, args)
def runners(self):
return ["MemgraphRunner", "NeoRunner"]
def groups(self):
return ["1000_create", "unwind_create", "match", "dense_expand",
"expression", "aggregation", "return", "update", "delete"]
class QueryParallelSuite(_QuerySuite):
def __init__(self, args):
_QuerySuite.__init__(self, args)
def runners(self):
return ["MemgraphRunner", "NeoRunner"]
def groups(self):
return ["aggregation_parallel", "create_parallel"]
# Database wrappers.
class Memgraph:
"""
Knows how to start and stop memgraph.
"""
def __init__(self, args, cpus):
self.log = logging.getLogger("MemgraphRunner")
argp = ArgumentParser("MemgraphArgumentParser", add_help=False)
argp.add_argument("--runner-bin",
default=get_absolute_path("memgraph", "build"))
argp.add_argument("--runner-config",
default=get_absolute_path("benchmarking_latency.conf", "config"))
argp.add_argument("--port", default="7687",
help="Database and client port")
self.log.info("Initializing Runner with arguments %r", args)
self.args, _ = argp.parse_known_args(args)
self.database_bin = jail.get_process()
self.database_bin.set_cpus(cpus)
def start(self):
self.log.info("start")
env = {"MEMGRAPH_CONFIG": self.args.runner_config}
database_args = ["--port", self.args.port]
# find executable path
runner_bin = self.args.runner_bin
if not os.path.exists(runner_bin):
# Apollo builds both debug and release binaries on diff
# so we need to use the release binary if the debug one
# doesn't exist
runner_bin = get_absolute_path("memgraph", "build_release")
# start memgraph
self.database_bin.run(runner_bin, database_args, env=env, timeout=600)
wait_for_server(self.args.port)
return self.database_bin.get_pid() if not APOLLO else None
def stop(self):
self.database_bin.send_signal(jail.SIGTERM)
self.database_bin.wait()
class Neo:
def __init__(self, args, cpus):
self.log = logging.getLogger("NeoRunner")
argp = ArgumentParser("NeoArgumentParser", add_help=False)
argp.add_argument("--runner-bin", default=get_absolute_path(
"neo4j/bin/neo4j", "libs"))
argp.add_argument("--runner-config",
default=get_absolute_path("config/neo4j.conf"))
argp.add_argument("--port", default="7687",
help="Database and client port")
argp.add_argument("--http-port", default="7474",
help="Database and client port")
self.log.info("Initializing Runner with arguments %r", args)
self.args, _ = argp.parse_known_args(args)
self.database_bin = jail.get_process()
self.database_bin.set_cpus(cpus)
def start(self):
self.log.info("start")
# create home directory
self.neo4j_home_path = tempfile.mkdtemp(dir="/dev/shm")
try:
os.symlink(os.path.join(get_absolute_path("neo4j", "libs"), "lib"),
os.path.join(self.neo4j_home_path, "lib"))
neo4j_conf_dir = os.path.join(self.neo4j_home_path, "conf")
neo4j_conf_file = os.path.join(neo4j_conf_dir, "neo4j.conf")
os.mkdir(neo4j_conf_dir)
shutil.copyfile(self.args.runner_config, neo4j_conf_file)
with open(neo4j_conf_file, "a") as f:
f.write("\ndbms.connector.bolt.listen_address=:" +
self.args.port + "\n")
f.write("\ndbms.connector.http.listen_address=:" +
self.args.http_port + "\n")
# environment
cwd = os.path.dirname(self.args.runner_bin)
env = {"NEO4J_HOME": self.neo4j_home_path}
self.database_bin.run(self.args.runner_bin, args=["console"],
env=env, timeout=600, cwd=cwd)
except:
shutil.rmtree(self.neo4j_home_path)
raise Exception("Couldn't run Neo4j!")
wait_for_server(self.args.http_port, 2.0)
return self.database_bin.get_pid() if not APOLLO else None
def stop(self):
self.database_bin.send_signal(jail.SIGTERM)
self.database_bin.wait()
if os.path.exists(self.neo4j_home_path):
shutil.rmtree(self.neo4j_home_path)
class Postgres:
"""
Knows how to start and stop PostgreSQL.
"""
def __init__(self, args, cpus):
self.log = logging.getLogger("PostgresRunner")
argp = ArgumentParser("PostgresArgumentParser", add_help=False)
argp.add_argument("--init-bin", default=get_absolute_path(
"postgresql/bin/initdb", "libs"))
argp.add_argument("--runner-bin", default=get_absolute_path(
"postgresql/bin/postgres", "libs"))
argp.add_argument("--port", default="5432",
help="Database and client port")
self.log.info("Initializing Runner with arguments %r", args)
self.args, _ = argp.parse_known_args(args)
self.username = "macro_benchmark"
self.database_bin = jail.get_process()
self.database_bin.set_cpus(cpus)
def start(self):
self.log.info("start")
self.data_path = tempfile.mkdtemp(dir="/dev/shm")
init_args = ["-D", self.data_path, "-U", self.username]
self.database_bin.run_and_wait(self.args.init_bin, init_args)
# args
runner_args = ["-D", self.data_path, "-c", "port=" + self.args.port,
"-c", "ssl=false", "-c", "max_worker_processes=1"]
try:
self.database_bin.run(self.args.runner_bin, args=runner_args,
timeout=600)
except:
shutil.rmtree(self.data_path)
raise Exception("Couldn't run PostgreSQL!")
wait_for_server(self.args.port)
return self.database_bin.get_pid() if not APOLLO else None
def stop(self):
self.database_bin.send_signal(jail.SIGTERM)
self.database_bin.wait()
if os.path.exists(self.data_path):
shutil.rmtree(self.data_path)
class _HarnessClientRunner:
"""
Knows how to start and stop database (backend) some client frontend (bolt),
and execute a cypher query.
Execution returns benchmarking data (execution times, memory
usage etc).
Inherited class should implement start method and initialise database_bin
and bolt_client members of type Process.
"""
def __init__(self, args, database, cpus=None):
if cpus is None: cpus = [2, 3]
self.log = logging.getLogger("_HarnessClientRunner")
self.database = database
argp = ArgumentParser("RunnerArgumentParser", add_help=False)
self.args, _ = argp.parse_known_args()
self.bolt_client = jail.get_process()
self.bolt_client.set_cpus(cpus)
def start(self):
self.database.start()
def execute(self, queries, num_client_workers):
self.log.debug("execute('%s')", str(queries))
client_path = "tests/macro_benchmark/harness_client"
client = get_absolute_path(client_path, "build")
if not os.path.exists(client):
# Apollo builds both debug and release binaries on diff
# so we need to use the release client if the debug one
# doesn't exist
client = get_absolute_path(client_path, "build_release")
queries_fd, queries_path = tempfile.mkstemp()
try:
queries_file = os.fdopen(queries_fd, "w")
queries_file.write("\n".join(queries))
queries_file.close()
except:
queries_file.close()
os.remove(queries_path)
raise Exception("Writing queries to temporary file failed")
output_fd, output = tempfile.mkstemp()
os.close(output_fd)
client_args = ["--port", self.database.args.port,
"--num-workers", str(num_client_workers),
"--output", output]
cpu_time_start = self.database.database_bin.get_usage()["cpu"]
# TODO make the timeout configurable per query or something
return_code = self.bolt_client.run_and_wait(
client, client_args, timeout=600, stdin=queries_path)
cpu_time_end = self.database.database_bin.get_usage()["cpu"]
os.remove(queries_path)
if return_code != 0:
with open(self.bolt_client.get_stderr()) as f:
stderr = f.read()
self.log.error("Error while executing queries '%s'. "
"Failed with return_code %d and stderr:\n%s",
str(queries), return_code, stderr)
raise Exception("BoltClient execution failed")
with open(output) as f:
data = json.loads(f.read())
data[CPU_TIME] = cpu_time_end - cpu_time_start
os.remove(output)
return data
def stop(self):
self.log.info("stop")
self.bolt_client.wait()
self.database.stop()
class MemgraphRunner(_HarnessClientRunner):
def __init__(self, args, client_cpus=None, database_cpus=None):
if database_cpus is None: database_cpus = [1]
database = Memgraph(args, database_cpus)
super(MemgraphRunner, self).__init__(args, database, cpus=client_cpus)
class NeoRunner(_HarnessClientRunner):
def __init__(self, args, client_cpus=None, database_cpus=None):
if database_cpus is None: database_cpus = [1]
database = Neo(args, database_cpus)
super(NeoRunner, self).__init__(args, database, cpus=client_cpus)
def main():
argp = ArgumentParser(description=__doc__)
# positional, mandatory args
@ -600,7 +187,8 @@ def main():
# Create suites.
suites = {"QuerySuite": QuerySuite,
"QueryParallelSuite": QueryParallelSuite}
"QueryParallelSuite": QueryParallelSuite,
"LongRunningSuite": LongRunningSuite}
if args.suite not in suites:
raise Exception(
"Suite '{}' isn't registered. Registered suites are: {}".format(
@ -608,14 +196,15 @@ def main():
suite = suites[args.suite](remaining_args)
# Load scenarios.
group_scenarios = load_scenarios(remaining_args)
group_scenarios = load_scenarios(
remaining_args, suite.KNOWN_KEYS, suite.groups())
log.info("Loaded %d groups, with a total of %d scenarios",
len(group_scenarios),
sum([len(x) for x in group_scenarios.values()]))
# Create runners.
runners = {"MemgraphRunner": MemgraphRunner, "NeoRunner": NeoRunner}
if args.runner not in suite.runners():
# Create runner.
runners = suite.runners()
if args.runner not in runners:
raise Exception("Runner '{}' not registered for suite '{}'".format(
args.runner, args.suite))
runner = runners[args.runner](remaining_args)

View File

@ -1,4 +1,5 @@
#!/usr/bin/python3
import atexit
import json
import os
@ -12,21 +13,17 @@ import uuid
from signal import *
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
STORAGE_DIR = os.path.join(SCRIPT_DIR, ".storage")
class ProcessException(Exception):
pass
class StorageException(Exception):
pass
class Process:
def __init__(self, tid):
self._tid = tid
@ -36,7 +33,8 @@ class Process:
self._usage = {}
self._files = []
def run(self, binary, args = None, env = None, timeout = 120, stdin = "/dev/null", cwd = "."):
def run(self, binary, args=None, env=None, timeout=120,
stdin="/dev/null", cwd="."):
if args is None: args = []
if env is None: env = {}
# don't start a new process if one is already running
@ -59,8 +57,8 @@ class Process:
self._timeout = timeout
# start process
self._proc = subprocess.Popen(exe, env = env, cwd = cwd,
stdin = open(stdin, "r"))
self._proc = subprocess.Popen(exe, env=env, cwd=cwd,
stdin=open(stdin, "r"))
def run_and_wait(self, *args, **kwargs):
check = kwargs.pop("check", True)
@ -92,7 +90,7 @@ class Process:
return self._usage
# this is implemented only in the real API
def set_cpus(self, cpus, hyper = True):
def set_cpus(self, cpus, hyper=True):
s = "out" if not hyper else ""
sys.stderr.write("WARNING: Trying to set cpus for {} to "
"{} with{} hyperthreading!\n".format(str(self), cpus, s))
@ -113,7 +111,7 @@ class Process:
raise ProcessException
return self._proc.pid
def _set_usage(self, val, name, only_value = False):
def _set_usage(self, val, name, only_value=False):
self._usage[name] = val
if only_value: return
maxname = "max_" + name
@ -138,10 +136,12 @@ class Process:
except:
return
# for a description of these fields see: man proc; man times
cpu_time = sum(map(lambda x: int(x) / self._ticks_per_sec, data_stat[13:17]))
cpu_time = sum(map(lambda x: int(x) / self._ticks_per_sec,
data_stat[13:17]))
self._set_usage(cpu_time, "cpu", only_value = True)
self._set_usage(int(data_stat[19]), "threads")
mem_vm, mem_res, mem_shr = map(lambda x: int(x) * self._page_size // 1024, data_statm[:3])
mem_vm, mem_res, mem_shr = map(
lambda x: int(x) * self._page_size // 1024, data_statm[:3])
self._set_usage(mem_res, "memory")
def _watchdog(self):
@ -169,13 +169,14 @@ def _usage_updater():
proc._do_background_tasks()
time.sleep(0.1)
_thread = threading.Thread(target = _usage_updater, daemon = True)
_thread = threading.Thread(target=_usage_updater, daemon=True)
_thread.start()
if not os.path.exists(STORAGE_DIR):
os.mkdir(STORAGE_DIR)
_storage_name = os.path.join(STORAGE_DIR, time.strftime("%Y%m%d%H%M%S") + ".json")
_storage_name = os.path.join(
STORAGE_DIR, time.strftime("%Y%m%d%H%M%S") + ".json")
_storage_file = open(_storage_name, "w")
@atexit.register

View File

@ -0,0 +1,129 @@
import logging
import os
import time
import itertools
import json
from argparse import ArgumentParser
from collections import defaultdict
from statistics import median
from common import get_absolute_path
from databases import Memgraph, Neo
from clients import QueryClient, LongRunningClient
log = logging.getLogger(__name__)
class LongRunningSuite:
KNOWN_KEYS = {"config", "setup", "run"}
def __init__(self, args):
argp = ArgumentParser("LongRunningSuiteArgumentParser")
argp.add_argument("--num-client-workers", default=4)
self.args, _ = argp.parse_known_args(args)
pass
def run(self, scenario, group_name, scenario_name, runner):
runner.start()
# This suite allows empty lines in setup. Those lines separate query
# groups. It is guaranteed that groups will be executed sequentially,
# but queries in each group are possibly executed concurrently.
query_groups = [[]]
for query in scenario.get("setup")():
if query == "":
query_groups.append([])
else:
query_groups[-1].append(query)
if query_groups[-1] == []:
query_groups.pop()
log.info("Executing {} query groups in setup"
.format(len(query_groups)))
for i, queries in enumerate(query_groups):
start_time = time.time()
# TODO: number of threads configurable
runner.setup(queries, self.args.num_client_workers)
log.info("\t{}. group imported in done in {:.2f} seconds".format(
i + 1, time.time() - start_time))
config = next(scenario.get("config")())
duration = config["duration"]
log.info("Executing run for {} seconds with {} client workers".format(
duration, self.args.num_client_workers))
# TODO: number of threads configurable
results = runner.run(next(scenario.get("run")()), duration,
self.args.num_client_workers)
runner.stop()
measurements = []
for result in results:
print(result["num_executed_queries"], result["elapsed_time"])
# TODO: Revise this.
measurements.append({
"target": "throughput",
"value": result["num_executed_queries"] / result["elapsed_time"],
"unit": "queries per second",
"type": "throughput"})
self.summary = "Throughtput: " + str(measurements[-1]["value"])
return measurements
def runners(self):
return { "MemgraphRunner" : MemgraphRunner, "NeoRunner" : NeoRunner }
def groups(self):
return ["pokec"]
class _LongRunningRunner:
def __init__(self, args, database):
self.log = logging.getLogger("_LongRunningRunner")
self.database = database
self.query_client = QueryClient(args)
self.long_running_client = LongRunningClient(args)
def start(self):
self.database.start()
def setup(self, queries, num_client_workers):
return self.query_client(queries, self.database, num_client_workers)
def run(self, config, duration, num_client_workers):
return self.long_running_client(
config, self.database, duration, num_client_workers)
def stop(self):
self.log.info("stop")
self.database.stop()
class MemgraphRunner(_LongRunningRunner):
"""
Configures memgraph database for QuerySuite execution.
"""
def __init__(self, args):
argp = ArgumentParser("MemgraphRunnerArgumentParser")
# TODO: change default config
argp.add_argument("--runner-config", default=get_absolute_path(
"benchmarking_throughput.conf", "config"),
help="Path to memgraph config")
argp.add_argument("--num-workers", help="Number of workers")
self.args, remaining_args = argp.parse_known_args(args)
database = Memgraph(remaining_args, self.args.runner_config,
self.args.num_workers)
super(MemgraphRunner, self).__init__(remaining_args, database)
class NeoRunner(_LongRunningRunner):
"""
Configures neo4j database for QuerySuite execution.
"""
def __init__(self, args):
argp = ArgumentParser("NeoRunnerArgumentParser")
argp.add_argument("--runner-config",
default=get_absolute_path(
"config/neo4j_long_running.conf"),
help="Path to neo config file")
self.args, remaining_args = argp.parse_known_args(args)
database = Neo(remaining_args, self.args.runner_config, [1])
super(NeoRunner, self).__init__(remaining_args, database)

View File

@ -1,32 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
from pathlib import Path
import subprocess
import signal
class Perf():
def __init__(self):
self.first = True
self.max_frequency = Path(
"/proc/sys/kernel/perf_event_max_sample_rate").read_text().strip()
# Check if lbr is available.
status = subprocess.call(
"perf record --call-graph=lbr -a -g sleep 0.0000001".split())
self.call_graph_technique = "lbr" if not status else "dwarf"
def start(self, pid, frequency=None):
if frequency is None: frequency = self.max_frequency
append = "-A" if not self.first else ""
self.first = False
perf_command = "perf record --call-graph={} -F {} -p {} -g {}".format(
self.call_graph_technique, frequency, pid, append).split()
self.perf_proc = subprocess.Popen(perf_command)
def stop(self):
self.perf_proc.send_signal(signal.SIGINT)
self.perf_proc.wait()

View File

@ -0,0 +1,209 @@
import logging
import os
import time
import itertools
import json
from argparse import ArgumentParser
from collections import defaultdict
import tempfile
from statistics import median
from common import get_absolute_path, WALL_TIME, CPU_TIME
from databases import Memgraph, Neo
from clients import QueryClient
log = logging.getLogger(__name__)
class _QuerySuite:
"""
Executes a Query-based benchmark scenario. Query-based scenarios
consist of setup steps (Cypher queries) executed before the benchmark,
a single Cypher query that is benchmarked, and teardown steps
(Cypher queries) executed after the benchmark.
"""
# what the QuerySuite can work with
KNOWN_KEYS = {"config", "setup", "itersetup", "run", "iterteardown",
"teardown", "common"}
FORMAT = ["{:>24}", "{:>28}", "{:>16}", "{:>18}", "{:>22}",
"{:>16}", "{:>16}"]
FULL_FORMAT = "".join(FORMAT) + "\n"
summary = FULL_FORMAT.format(
"group_name", "scenario_name", "parsing_time",
"planning_time", "plan_execution_time",
WALL_TIME, CPU_TIME)
def __init__(self, args):
pass
def run(self, scenario, group_name, scenario_name, runner):
log.debug("QuerySuite.run() with scenario: %s", scenario)
scenario_config = scenario.get("config")
scenario_config = next(scenario_config()) if scenario_config else {}
def execute(config_name, num_client_workers=1):
queries = scenario.get(config_name)
start_time = time.time()
if queries:
r_val = runner.execute(queries(), num_client_workers)
else:
r_val = None
log.info("\t%s done in %.2f seconds" % (config_name,
time.time() - start_time))
return r_val
def add_measurement(dictionary, iteration, key):
if key in dictionary:
measurement = {"target": key,
"value": float(dictionary[key]),
"unit": "s",
"type": "time",
"iteration": iteration}
measurements.append(measurement)
try:
measurement_lists[key].append(float(dictionary[key]))
except:
pass
measurements = []
measurement_lists = defaultdict(list)
# Run the whole test 3 times because memgraph is sometimes
# consistently slow and with this hack we get a good median
for i in range(3):
runner.start()
execute("setup")
# warmup phase
for _ in range(min(scenario_config.get("iterations", 1),
scenario_config.get("warmup", 2))):
execute("itersetup")
execute("run", scenario_config.get("num_client_workers", 1))
execute("iterteardown")
# TODO per scenario/run runner configuration
num_iterations = scenario_config.get("iterations", 1)
for iteration in range(num_iterations):
# TODO if we didn't have the itersetup it would be trivial
# to move iteration to the bolt_client script, so we would not
# have to start and stop the client for each iteration, it would
# most likely run faster
execute("itersetup")
run_result = execute("run",
scenario_config.get("num_client_workers", 1))
add_measurement(run_result, iteration, WALL_TIME)
add_measurement(run_result, iteration, CPU_TIME)
for measurement in ["parsing_time",
"plan_execution_time",
"planning_time"] :
for i in range(len(run_result.get("metadatas", []))):
add_measurement(run_result["metadatas"][i], iteration,
measurement)
execute("iterteardown")
# TODO value outlier detection and warning across iterations
execute("teardown")
runner.stop()
self.append_scenario_summary(group_name, scenario_name,
measurement_lists, num_iterations)
return measurements
def append_scenario_summary(self, group_name, scenario_name,
measurement_lists, num_iterations):
self.summary += self.FORMAT[0].format(group_name)
self.summary += self.FORMAT[1].format(scenario_name)
for i, key in enumerate(("parsing_time", "planning_time",
"plan_execution_time", WALL_TIME, CPU_TIME)):
if key not in measurement_lists:
time = "-"
else:
# Median is used instead of avg to avoid effect of outliers.
time = "{:.10f}".format(median(measurement_lists[key]))
self.summary += self.FORMAT[i + 2].format(time)
self.summary += "\n"
def runners(self):
""" Which runners can execute a QuerySuite scenario """
assert False, "This is a base class, use one of derived suites"
def groups(self):
""" Which groups can be executed by a QuerySuite scenario """
assert False, "This is a base class, use one of derived suites"
class QuerySuite(_QuerySuite):
def __init__(self, args):
_QuerySuite.__init__(self, args)
def runners(self):
return {"MemgraphRunner" : MemgraphRunner, "NeoRunner" : NeoRunner}
def groups(self):
return ["1000_create", "unwind_create", "match", "dense_expand",
"expression", "aggregation", "return", "update", "delete"]
class QueryParallelSuite(_QuerySuite):
def __init__(self, args):
_QuerySuite.__init__(self, args)
def runners(self):
# TODO: We should use different runners which will use more threads.
return {"MemgraphRunner" : MemgraphRunner, "NeoRunner" : NeoRunner}
def groups(self):
return ["aggregation_parallel", "create_parallel"]
class _QueryRunner:
"""
Knows how to start and stop database (backend) some client frontend (bolt),
and execute a cypher query.
Execution returns benchmarking data (execution times, memory
usage etc).
"""
def __init__(self, args, database):
self.log = logging.getLogger("_HarnessClientRunner")
self.database = database
self.query_client = QueryClient(args, [2, 3])
def start(self):
self.database.start()
def execute(self, queries, num_client_workers):
return self.query_client(queries, self.database, num_client_workers)
def stop(self):
self.log.info("stop")
self.database.stop()
class MemgraphRunner(_QueryRunner):
"""
Configures memgraph database for QuerySuite execution.
"""
def __init__(self, args):
argp = ArgumentParser("MemgraphRunnerArgumentParser")
argp.add_argument("--runner-config", default=get_absolute_path(
"benchmarking_latency.conf", "config"),
help="Path to memgraph config")
argp.add_argument("--num-workers", help="Number of workers")
self.args, remaining_args = argp.parse_known_args(args)
database = Memgraph(remaining_args, self.args.runner_config,
self.args.num_workers, [1])
super(MemgraphRunner, self).__init__(remaining_args, database)
class NeoRunner(_QueryRunner):
"""
Configures neo4j database for QuerySuite execution.
"""
def __init__(self, args):
argp = ArgumentParser("NeoRunnerArgumentParser")
argp.add_argument("--runner-config",
default=get_absolute_path("config/neo4j.conf"),
help="Path to neo config file")
self.args, remaining_args = argp.parse_known_args(args)
database = Neo(remaining_args, self.args.runner_config, [1])
super(NeoRunner, self).__init__(remaining_args, database)

View File

@ -1,2 +0,0 @@
*
!.gitignore

View File

@ -17,7 +17,7 @@ mkdir build_release
cd build_release
cmake -DCMAKE_BUILD_TYPE=release ..
TIMEOUT=1000 make -j$THREADS memgraph_link_target memgraph__macro_benchmark__harness_client
TIMEOUT=1000 make -j$THREADS memgraph_link_target memgraph__macro_benchmark
cd ../tools/apollo

View File

@ -21,7 +21,7 @@ mkdir build_release
cd build_release
cmake -DCMAKE_BUILD_TYPE=release ..
TIMEOUT=1000 make -j$THREADS memgraph_link_target memgraph__macro_benchmark__harness_client
TIMEOUT=1000 make -j$THREADS memgraph_link_target memgraph__macro_benchmark
cd ../../parent
@ -30,7 +30,7 @@ TIMEOUT=600 ./init
cd build
cmake -DCMAKE_BUILD_TYPE=release ..
TIMEOUT=1000 make -j$THREADS memgraph_link_target parent__macro_benchmark__harness_client
TIMEOUT=1000 make -j$THREADS memgraph_link_target parent__macro_benchmark
cd ../../memgraph/tools/apollo