Remove PostgreSQL
Reviewers: buda, teon.banek Reviewed By: buda Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1326
This commit is contained in:
parent
ad28ac5e7c
commit
2924746f20
@ -98,13 +98,6 @@ rm -rf neo4j
|
||||
mv neo4j-community-3.2.3 neo4j
|
||||
rm neo4j.tar.gz
|
||||
|
||||
# postgresql
|
||||
wget -nv http://deps.memgraph.io/postgresql-9.6.5-1-linux-x64-binaries.tar.gz -O postgres.tar.gz
|
||||
tar -xzf postgres.tar.gz
|
||||
rm -rf postgresql
|
||||
mv pgsql postgresql
|
||||
rm postgres.tar.gz
|
||||
|
||||
# nlohmann json
|
||||
# We wget header instead of cloning repo since repo is huge (lots of test data).
|
||||
# We use head on Sep 1, 2017 instead of last release since it was long time ago.
|
||||
|
@ -34,7 +34,6 @@ for i in range(NUM_MACHINES):
|
||||
"jail_service.py",
|
||||
"card_fraud/card_fraud.py",
|
||||
"card_fraud/snapshots/worker_" + str(i),
|
||||
"../../libs/postgresql/lib",
|
||||
] + additional,
|
||||
"outfile_paths": outfile_paths,
|
||||
"parallel_run": "distributed__card_fraud",
|
||||
|
@ -5,10 +5,6 @@ get_filename_component(test_type ${CMAKE_CURRENT_SOURCE_DIR} NAME)
|
||||
file(GLOB_RECURSE test_type_cpps *.cpp)
|
||||
message(STATUS "Available ${test_type} cpp files are: ${test_type_cpps}")
|
||||
|
||||
add_library(postgres SHARED IMPORTED)
|
||||
set_property(TARGET postgres PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${libs_dir}/postgresql/include)
|
||||
set_property(TARGET postgres PROPERTY IMPORTED_LOCATION ${libs_dir}/postgresql/lib/libpq.so)
|
||||
|
||||
# add target that depends on all other targets
|
||||
set(all_targets_target memgraph__${test_type})
|
||||
add_custom_target(${all_targets_target})
|
||||
@ -30,7 +26,7 @@ foreach(test_cpp ${test_type_cpps})
|
||||
set_target_properties(${target_name} PROPERTIES OUTPUT_NAME ${exec_name})
|
||||
|
||||
# link libraries
|
||||
target_link_libraries(${target_name} memgraph_lib postgres)
|
||||
target_link_libraries(${target_name} memgraph_lib)
|
||||
|
||||
# add target to dependencies
|
||||
add_dependencies(${all_targets_target} ${target_name})
|
||||
|
@ -9,7 +9,6 @@
|
||||
- ../../build_release/memgraph # memgraph release binary
|
||||
- ../../config # directory with config files
|
||||
- ../../build_release/tests/macro_benchmark # macro benchmark client binaries
|
||||
- ../../libs/postgresql/lib # postgresql libs dir (for client binaries)
|
||||
outfile_paths: &MACRO_BENCHMARK_OUTFILE_PATHS
|
||||
- \./memgraph/tests/macro_benchmark/\.harness_summary
|
||||
|
||||
|
@ -5,11 +5,12 @@
|
||||
|
||||
#include "gflags/gflags.h"
|
||||
|
||||
#include "long_running_common.hpp"
|
||||
#include "stats/stats.hpp"
|
||||
#include "stats/stats_rpc_messages.hpp"
|
||||
#include "threading/sync/rwlock.hpp"
|
||||
|
||||
#include "long_running_common.hpp"
|
||||
|
||||
// TODO(mtomic): this sucks but I don't know a different way to make it work
|
||||
#include "boost/archive/binary_iarchive.hpp"
|
||||
#include "boost/archive/binary_oarchive.hpp"
|
||||
@ -38,19 +39,19 @@ void UpdateStats() {
|
||||
num_edges.Set(2 * num_transactions);
|
||||
}
|
||||
|
||||
int64_t NumNodesWithLabel(BoltClient &client, std::string label) {
|
||||
int64_t NumNodesWithLabel(Client &client, std::string label) {
|
||||
std::string query = fmt::format("MATCH (u :{}) RETURN count(u)", label);
|
||||
auto result = ExecuteNTimesTillSuccess(client, query, {}, MAX_RETRIES);
|
||||
return result.first.records[0][0].ValueInt();
|
||||
}
|
||||
|
||||
int64_t MaxIdForLabel(BoltClient &client, std::string label) {
|
||||
int64_t MaxIdForLabel(Client &client, std::string label) {
|
||||
std::string query = fmt::format("MATCH (u :{}) RETURN max(u.id)", label);
|
||||
auto result = ExecuteNTimesTillSuccess(client, query, {}, MAX_RETRIES);
|
||||
return result.first.records[0][0].ValueInt();
|
||||
}
|
||||
|
||||
void CreateIndex(BoltClient &client, const std::string &label,
|
||||
void CreateIndex(Client &client, const std::string &label,
|
||||
const std::string &property) {
|
||||
LOG(INFO) << fmt::format("Creating indexes for :{}({})...", label, property);
|
||||
ExecuteNTimesTillSuccess(
|
||||
@ -342,7 +343,11 @@ int main(int argc, char **argv) {
|
||||
stats::InitStatsLogging(
|
||||
fmt::format("client.long_running.{}.{}", FLAGS_group, FLAGS_scenario));
|
||||
|
||||
BoltClient client(FLAGS_address, FLAGS_port, FLAGS_username, FLAGS_password);
|
||||
Endpoint endpoint(FLAGS_address, FLAGS_port);
|
||||
Client client;
|
||||
if (!client.Connect(endpoint, FLAGS_username, FLAGS_password)) {
|
||||
LOG(FATAL) << "Couldn't connect to " << endpoint;
|
||||
}
|
||||
|
||||
num_pos.store(NumNodesWithLabel(client, "Pos"));
|
||||
num_cards.store(NumNodesWithLabel(client, "Card"));
|
||||
|
@ -11,11 +11,11 @@
|
||||
#include "utils/exceptions.hpp"
|
||||
#include "utils/timer.hpp"
|
||||
|
||||
namespace {
|
||||
using communication::bolt::Client;
|
||||
using communication::bolt::DecodedValue;
|
||||
using io::network::Endpoint;
|
||||
|
||||
void PrintJsonDecodedValue(std::ostream &os,
|
||||
const communication::bolt::DecodedValue &value) {
|
||||
using communication::bolt::DecodedValue;
|
||||
void PrintJsonDecodedValue(std::ostream &os, const DecodedValue &value) {
|
||||
switch (value.type()) {
|
||||
case DecodedValue::Type::Null:
|
||||
os << "null";
|
||||
@ -55,9 +55,8 @@ void PrintJsonDecodedValue(std::ostream &os,
|
||||
}
|
||||
}
|
||||
|
||||
template <typename TClient>
|
||||
std::pair<communication::bolt::QueryData, int> ExecuteNTimesTillSuccess(
|
||||
TClient &client, const std::string &query,
|
||||
Client &client, const std::string &query,
|
||||
const std::map<std::string, communication::bolt::DecodedValue> ¶ms,
|
||||
int max_attempts) {
|
||||
static thread_local std::mt19937 pseudo_rand_gen_{std::random_device{}()};
|
||||
@ -81,5 +80,3 @@ std::pair<communication::bolt::QueryData, int> ExecuteNTimesTillSuccess(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
@ -2,14 +2,13 @@
|
||||
|
||||
#include "json/json.hpp"
|
||||
|
||||
#include "bolt_client.hpp"
|
||||
#include "common.hpp"
|
||||
#include "communication/bolt/v1/decoder/decoded_value.hpp"
|
||||
#include "stats/metrics.hpp"
|
||||
#include "stats/stats.hpp"
|
||||
#include "utils/network.hpp"
|
||||
#include "utils/timer.hpp"
|
||||
|
||||
#include "common.hpp"
|
||||
|
||||
const int MAX_RETRIES = 30;
|
||||
|
||||
DEFINE_string(address, "127.0.0.1", "Server address");
|
||||
@ -28,8 +27,12 @@ auto &serialization_errors = stats::GetCounter("serialization_errors");
|
||||
|
||||
class TestClient {
|
||||
public:
|
||||
TestClient()
|
||||
: client_(FLAGS_address, FLAGS_port, FLAGS_username, FLAGS_password) {}
|
||||
TestClient() {
|
||||
Endpoint endpoint(FLAGS_address, FLAGS_port);
|
||||
if (!client_.Connect(endpoint, FLAGS_username, FLAGS_password)) {
|
||||
LOG(FATAL) << "Couldn't connect to " << endpoint;
|
||||
}
|
||||
}
|
||||
|
||||
virtual ~TestClient() {}
|
||||
|
||||
@ -95,7 +98,7 @@ class TestClient {
|
||||
std::thread runner_thread_;
|
||||
|
||||
private:
|
||||
BoltClient client_;
|
||||
Client client_;
|
||||
};
|
||||
|
||||
void RunMultithreadedTest(std::vector<std::unique_ptr<TestClient>> &clients) {
|
||||
|
@ -13,18 +13,13 @@
|
||||
#include <glog/logging.h>
|
||||
#include <json/json.hpp>
|
||||
|
||||
#include "bolt_client.hpp"
|
||||
#include "common.hpp"
|
||||
#include "communication/bolt/client.hpp"
|
||||
#include "communication/bolt/v1/decoder/decoded_value.hpp"
|
||||
#include "io/network/endpoint.hpp"
|
||||
#include "io/network/socket.hpp"
|
||||
#include "long_running_common.hpp"
|
||||
#include "threading/sync/spinlock.hpp"
|
||||
#include "utils/algorithm.hpp"
|
||||
#include "utils/network.hpp"
|
||||
#include "utils/timer.hpp"
|
||||
|
||||
#include "long_running_common.hpp"
|
||||
|
||||
using communication::bolt::DecodedEdge;
|
||||
using communication::bolt::DecodedValue;
|
||||
using communication::bolt::DecodedVertex;
|
||||
@ -223,14 +218,14 @@ class PokecClient : public TestClient {
|
||||
}
|
||||
};
|
||||
|
||||
int64_t NumNodes(BoltClient &client, const std::string &label) {
|
||||
int64_t NumNodes(Client &client, const std::string &label) {
|
||||
auto result = ExecuteNTimesTillSuccess(
|
||||
client, "MATCH (n :" + label + ") RETURN COUNT(n) as cnt", {},
|
||||
MAX_RETRIES);
|
||||
return result.first.records[0][0].ValueInt();
|
||||
}
|
||||
|
||||
std::vector<int64_t> Neighbours(BoltClient &client, const std::string &label,
|
||||
std::vector<int64_t> Neighbours(Client &client, const std::string &label,
|
||||
int64_t id) {
|
||||
auto result = ExecuteNTimesTillSuccess(client,
|
||||
"MATCH (n :" + label +
|
||||
@ -244,8 +239,7 @@ std::vector<int64_t> Neighbours(BoltClient &client, const std::string &label,
|
||||
return ret;
|
||||
}
|
||||
|
||||
std::vector<int64_t> IndependentSet(BoltClient &client,
|
||||
const std::string &label) {
|
||||
std::vector<int64_t> IndependentSet(Client &client, const std::string &label) {
|
||||
const int64_t num_nodes = NumNodes(client, label);
|
||||
std::vector<int64_t> independent_nodes_ids;
|
||||
std::vector<int64_t> ids;
|
||||
@ -282,8 +276,11 @@ int main(int argc, char **argv) {
|
||||
std::cin >> config;
|
||||
|
||||
auto independent_nodes_ids = [&] {
|
||||
BoltClient client(utils::ResolveHostname(FLAGS_address), FLAGS_port,
|
||||
FLAGS_username, FLAGS_password);
|
||||
Endpoint endpoint(utils::ResolveHostname(FLAGS_address), FLAGS_port);
|
||||
Client client;
|
||||
if (!client.Connect(endpoint, FLAGS_username, FLAGS_password)) {
|
||||
LOG(FATAL) << "Couldn't connect to " << endpoint;
|
||||
}
|
||||
return IndependentSet(client, INDEPENDENT_LABEL);
|
||||
}();
|
||||
|
||||
|
@ -1,108 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <map>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include <fmt/format.h>
|
||||
#include <glog/logging.h>
|
||||
#include <libpq-fe.h>
|
||||
|
||||
#include "communication/bolt/client.hpp"
|
||||
#include "communication/bolt/v1/decoder/decoded_value.hpp"
|
||||
#include "utils/exceptions.hpp"
|
||||
|
||||
using communication::bolt::QueryData;
|
||||
|
||||
namespace postgres {
|
||||
|
||||
class ClientException : public utils::BasicException {
|
||||
using utils::BasicException::BasicException;
|
||||
};
|
||||
|
||||
class ClientQueryException : public ClientException {
|
||||
public:
|
||||
using ClientException::ClientException;
|
||||
ClientQueryException() : ClientException("Couldn't execute query!") {}
|
||||
};
|
||||
|
||||
class Client {
|
||||
public:
|
||||
Client(const std::string &host, const std::string &port,
|
||||
const std::string &username, const std::string &password,
|
||||
const std::string &database = "") {
|
||||
// https://www.postgresql.org/docs/9.4/static/libpq-connect.html#LIBPQ-PARAMKEYWORDS
|
||||
std::string pass = "";
|
||||
if (password != "") {
|
||||
pass = "password=" + password;
|
||||
}
|
||||
std::string conninfo =
|
||||
fmt::format("host={} port={} user={} {} dbname={} sslmode=disable",
|
||||
host, port, username, pass, database);
|
||||
|
||||
// Make a connection to the database.
|
||||
connection_ = PQconnectdb(conninfo.c_str());
|
||||
|
||||
// Check to see that the backend connection was successfully made
|
||||
if (PQstatus(connection_) != CONNECTION_OK) {
|
||||
throw ClientException(PQerrorMessage(connection_));
|
||||
}
|
||||
}
|
||||
|
||||
QueryData Execute(
|
||||
const std::string &query,
|
||||
const std::map<std::string, communication::bolt::DecodedValue>
|
||||
¶meters) {
|
||||
QueryData ret;
|
||||
|
||||
CHECK(parameters.size() == 0U) << "Parameters not yet supported";
|
||||
DLOG(INFO) << "Sending run message with statement: '" << query << "'";
|
||||
|
||||
result_ = PQexec(connection_, query.c_str());
|
||||
if (PQresultStatus(result_) == PGRES_TUPLES_OK) {
|
||||
// get fields
|
||||
int num_fields = PQnfields(result_);
|
||||
for (int i = 0; i < num_fields; ++i) {
|
||||
ret.fields.push_back(PQfname(result_, i));
|
||||
}
|
||||
|
||||
// get records
|
||||
int num_records = PQntuples(result_);
|
||||
ret.records.resize(num_records);
|
||||
for (int i = 0; i < num_records; ++i) {
|
||||
for (int j = 0; j < num_fields; ++j) {
|
||||
ret.records[i].push_back(std::string(PQgetvalue(result_, i, j)));
|
||||
}
|
||||
}
|
||||
|
||||
// get metadata
|
||||
ret.metadata.insert({"status", std::string(PQcmdStatus(result_))});
|
||||
ret.metadata.insert({"rows_affected", std::string(PQcmdTuples(result_))});
|
||||
} else if (PQresultStatus(result_) != PGRES_COMMAND_OK) {
|
||||
throw ClientQueryException(PQerrorMessage(connection_));
|
||||
}
|
||||
|
||||
PQclear(result_);
|
||||
result_ = nullptr;
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
void Close() {
|
||||
if (result_ != nullptr) {
|
||||
PQclear(result_);
|
||||
result_ = nullptr;
|
||||
}
|
||||
if (connection_ != nullptr) {
|
||||
PQfinish(connection_);
|
||||
connection_ = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
~Client() { Close(); }
|
||||
|
||||
private:
|
||||
PGconn *connection_{nullptr};
|
||||
PGresult *result_{nullptr};
|
||||
};
|
||||
}
|
@ -4,27 +4,21 @@
|
||||
#include <gflags/gflags.h>
|
||||
#include <glog/logging.h>
|
||||
|
||||
#include "communication/bolt/client.hpp"
|
||||
#include "communication/bolt/v1/decoder/decoded_value.hpp"
|
||||
#include "threading/sync/spinlock.hpp"
|
||||
#include "utils/algorithm.hpp"
|
||||
#include "utils/string.hpp"
|
||||
#include "utils/timer.hpp"
|
||||
|
||||
#include "bolt_client.hpp"
|
||||
#include "common.hpp"
|
||||
//#include "postgres_client.hpp"
|
||||
|
||||
DEFINE_string(protocol, "bolt", "Protocol to use (available: bolt, postgres)");
|
||||
DEFINE_int32(num_workers, 1, "Number of workers");
|
||||
DEFINE_string(input, "", "Input file");
|
||||
DEFINE_string(output, "", "Output file");
|
||||
|
||||
DEFINE_string(address, "127.0.0.1", "Server address");
|
||||
DEFINE_int32(port, 0, "Server port");
|
||||
DEFINE_int32(port, 7687, "Server port");
|
||||
DEFINE_string(username, "", "Username for the database");
|
||||
DEFINE_string(password, "", "Password for the database");
|
||||
DEFINE_string(database, "", "Database for the database");
|
||||
|
||||
using communication::bolt::DecodedValue;
|
||||
|
||||
@ -49,11 +43,8 @@ void PrintSummary(
|
||||
os << "}\n";
|
||||
}
|
||||
|
||||
template <typename ClientT>
|
||||
void ExecuteQueries(const std::vector<std::string> &queries, int num_workers,
|
||||
std::ostream &ostream, std::string &address, uint16_t port,
|
||||
std::string &username, std::string &password,
|
||||
std::string &database) {
|
||||
void ExecuteQueries(const std::vector<std::string> &queries,
|
||||
std::ostream &ostream) {
|
||||
std::vector<std::thread> threads;
|
||||
|
||||
SpinLock spinlock;
|
||||
@ -64,9 +55,13 @@ void ExecuteQueries(const std::vector<std::string> &queries, int num_workers,
|
||||
|
||||
utils::Timer timer;
|
||||
|
||||
for (int i = 0; i < num_workers; ++i) {
|
||||
for (int i = 0; i < FLAGS_num_workers; ++i) {
|
||||
threads.push_back(std::thread([&]() {
|
||||
ClientT client(address, port, username, password, database);
|
||||
Endpoint endpoint(FLAGS_address, FLAGS_port);
|
||||
Client client;
|
||||
if (!client.Connect(endpoint, FLAGS_username, FLAGS_password)) {
|
||||
LOG(FATAL) << "Couldn't connect to " << endpoint;
|
||||
}
|
||||
|
||||
std::string str;
|
||||
while (true) {
|
||||
@ -91,7 +86,7 @@ void ExecuteQueries(const std::vector<std::string> &queries, int num_workers,
|
||||
}));
|
||||
}
|
||||
|
||||
for (int i = 0; i < num_workers; ++i) {
|
||||
for (int i = 0; i < FLAGS_num_workers; ++i) {
|
||||
threads[i].join();
|
||||
}
|
||||
|
||||
@ -121,13 +116,6 @@ int main(int argc, char **argv) {
|
||||
ostream = &ofile;
|
||||
}
|
||||
|
||||
uint16_t port = FLAGS_port;
|
||||
if (FLAGS_protocol == "bolt") {
|
||||
if (port == 0) port = 7687;
|
||||
} else if (FLAGS_protocol == "postgres") {
|
||||
if (port == 0) port = 5432;
|
||||
}
|
||||
|
||||
while (!istream->eof()) {
|
||||
std::vector<std::string> queries;
|
||||
std::string query;
|
||||
@ -135,27 +123,7 @@ int main(int argc, char **argv) {
|
||||
utils::Trim(query) != ";") {
|
||||
queries.push_back(query);
|
||||
}
|
||||
|
||||
if (FLAGS_protocol == "bolt") {
|
||||
ExecuteQueries<BoltClient>(queries, FLAGS_num_workers, *ostream,
|
||||
FLAGS_address, port, FLAGS_username,
|
||||
FLAGS_password, FLAGS_database);
|
||||
} else if (FLAGS_protocol == "postgres") {
|
||||
LOG(FATAL) << "Postgres not yet supported";
|
||||
// TODO: Currently libpq is linked dynamically so it is a pain to move
|
||||
// harness_client executable to other machines without libpq.
|
||||
// CHECK(FLAGS_username != "") << "Username can't be empty for
|
||||
// postgres!";
|
||||
// CHECK(FLAGS_database != "") << "Database can't be empty for
|
||||
// postgres!";
|
||||
// if (port == "") port = "5432";
|
||||
//
|
||||
// using PostgresClientT = postgres::Client;
|
||||
// using PostgresExceptionT = postgres::ClientQueryException;
|
||||
// ExecuteQueries<PostgresClientT, PostgresExceptionT>(
|
||||
// *istream, FLAGS_num_workers, *ostream, FLAGS_address, port,
|
||||
// FLAGS_username, FLAGS_password, FLAGS_database);
|
||||
}
|
||||
ExecuteQueries(queries, *ostream);
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
@ -128,48 +128,3 @@ class Neo:
|
||||
self.database_bin.wait()
|
||||
if os.path.exists(self.neo4j_home_path):
|
||||
shutil.rmtree(self.neo4j_home_path)
|
||||
|
||||
|
||||
class Postgres:
|
||||
"""
|
||||
Knows how to start and stop PostgreSQL.
|
||||
"""
|
||||
def __init__(self, args, cpus):
|
||||
self.log = logging.getLogger("PostgresRunner")
|
||||
argp = ArgumentParser("PostgresArgumentParser")
|
||||
argp.add_argument("--init-bin", default=get_absolute_path(
|
||||
"postgresql/bin/initdb", "libs"))
|
||||
argp.add_argument("--runner-bin", default=get_absolute_path(
|
||||
"postgresql/bin/postgres", "libs"))
|
||||
argp.add_argument("--port", default="5432",
|
||||
help="Database and client port")
|
||||
self.log.info("Initializing Runner with arguments %r", args)
|
||||
self.args, _ = argp.parse_known_args(args)
|
||||
self.username = "macro_benchmark"
|
||||
self.database_bin = jail.get_process()
|
||||
set_cpus("database-cpu-ids", self.database_bin, args)
|
||||
|
||||
def start(self):
|
||||
self.log.info("start")
|
||||
self.data_path = tempfile.mkdtemp(dir="/dev/shm")
|
||||
init_args = ["-D", self.data_path, "-U", self.username]
|
||||
self.database_bin.run_and_wait(self.args.init_bin, init_args)
|
||||
|
||||
# args
|
||||
runner_args = ["-D", self.data_path, "-c", "port=" + self.args.port,
|
||||
"-c", "ssl=false", "-c", "max_worker_processes=1"]
|
||||
|
||||
try:
|
||||
self.database_bin.run(self.args.runner_bin, args=runner_args,
|
||||
timeout=600)
|
||||
except:
|
||||
shutil.rmtree(self.data_path)
|
||||
raise Exception("Couldn't run PostgreSQL!")
|
||||
|
||||
wait_for_server(self.args.port)
|
||||
|
||||
def stop(self):
|
||||
self.database_bin.send_signal(jail.SIGTERM)
|
||||
self.database_bin.wait()
|
||||
if os.path.exists(self.data_path):
|
||||
shutil.rmtree(self.data_path)
|
||||
|
Loading…
Reference in New Issue
Block a user