Add kafka benchmark
Summary: In order to add kafka benchmark, `memgraph_bolt.cpp` has been split. Now we have `memgraph_init.cpp/hpp` files with common memgraph startup code. Kafka benchmark implements a new `main` function that doesn't start a bolt server, it just creates and starts a stream. Then it waits for the stream to start consuming and measures the time it took to import the given number of entries. This benchmark is in a new folder, `feature_benchmark`, and so should any new bechmark that measures performance of memgraphs features. Reviewers: mferencevic, teon.banek, ipaljak, vkasljevic Reviewed By: mferencevic, teon.banek Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1552
This commit is contained in:
parent
d106aff88f
commit
57b84f2da3
@ -31,7 +31,7 @@
|
||||
mkdir build_release
|
||||
cd build_release
|
||||
cmake -DCMAKE_BUILD_TYPE=release ..
|
||||
TIMEOUT=1200 make -j$THREADS memgraph tools memgraph__macro_benchmark memgraph__stress memgraph__manual__card_fraud_generate_snapshot
|
||||
TIMEOUT=1200 make -j$THREADS memgraph tools memgraph__macro_benchmark memgraph__stress memgraph__manual__card_fraud_generate_snapshot memgraph__feature_benchmark__kafka__benchmark
|
||||
|
||||
# Generate distributed card fraud dataset.
|
||||
cd ../tests/distributed/card_fraud
|
||||
|
@ -77,6 +77,7 @@ set(memgraph_src_files
|
||||
transactions/engine_single_node.cpp
|
||||
transactions/engine_worker.cpp
|
||||
transactions/snapshot.cpp
|
||||
memgraph_init.cpp
|
||||
)
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
|
@ -129,7 +129,6 @@ void Consumer::StartConsuming(
|
||||
transform_alive_.store(true);
|
||||
|
||||
while (is_running_) {
|
||||
// TODO (msantl): Figure out what to do with potential exceptions here.
|
||||
auto batch = this->GetBatch();
|
||||
|
||||
if (batch.empty()) continue;
|
||||
@ -143,7 +142,7 @@ void Consumer::StartConsuming(
|
||||
// TODO (mferencevic): Figure out what to do with all other exceptions.
|
||||
try {
|
||||
transform.Apply(batch, stream_writer_);
|
||||
} catch (const TransformExecutionException) {
|
||||
} catch (const TransformExecutionException &) {
|
||||
LOG(WARNING) << "[Kafka] stream " << info_.stream_name
|
||||
<< " the transform process has died!";
|
||||
break;
|
||||
|
@ -67,6 +67,7 @@ class Streams final {
|
||||
/// @param batch_limit number of batches we want to import before stopping
|
||||
///
|
||||
/// @throws StreamDoesntExistException if the stream doesn't exist
|
||||
/// @throws ConsumerRunningException if the consumer is already running
|
||||
/// @throws StreamMetadataCouldNotBeStored if it can't persist metadata
|
||||
void Start(const std::string &stream_name,
|
||||
std::experimental::optional<int64_t> batch_limit =
|
||||
@ -77,6 +78,7 @@ class Streams final {
|
||||
/// @param stream_name name of the stream we wanto to stop consuming
|
||||
///
|
||||
/// @throws StreamDoesntExistException if the stream doesn't exist
|
||||
/// @throws ConsumerStoppedException if the consumer is already stopped
|
||||
/// @throws StreamMetadataCouldNotBeStored if it can't persist metadata
|
||||
void Stop(const std::string &stream_name);
|
||||
|
||||
|
@ -1,38 +1,23 @@
|
||||
#include <algorithm>
|
||||
#include <chrono>
|
||||
#include <csignal>
|
||||
#include <cstdint>
|
||||
#include <exception>
|
||||
#include <functional>
|
||||
#include <limits>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
|
||||
#include <gflags/gflags.h>
|
||||
#include <glog/logging.h>
|
||||
|
||||
#include "auth/auth.hpp"
|
||||
#include "communication/bolt/v1/session.hpp"
|
||||
#include "config.hpp"
|
||||
#include "database/distributed_graph_db.hpp"
|
||||
#include "database/graph_db.hpp"
|
||||
#include "distributed/pull_rpc_clients.hpp"
|
||||
#include "glue/auth.hpp"
|
||||
#include "glue/communication.hpp"
|
||||
#include "integrations/kafka/exceptions.hpp"
|
||||
#include "integrations/kafka/streams.hpp"
|
||||
#include "memgraph_init.hpp"
|
||||
#include "query/distributed_interpreter.hpp"
|
||||
#include "query/exceptions.hpp"
|
||||
#include "query/interpreter.hpp"
|
||||
#include "query/transaction_engine.hpp"
|
||||
#include "requests/requests.hpp"
|
||||
#include "stats/stats.hpp"
|
||||
#include "telemetry/telemetry.hpp"
|
||||
#include "utils/flag_validation.hpp"
|
||||
#include "utils/signals.hpp"
|
||||
#include "utils/sysinfo/memory.hpp"
|
||||
#include "utils/terminate_handler.hpp"
|
||||
#include "version.hpp"
|
||||
|
||||
// Common stuff for enterprise and community editions
|
||||
|
||||
@ -50,282 +35,30 @@ DEFINE_VALIDATED_int32(session_inactivity_timeout, 1800,
|
||||
FLAG_IN_RANGE(1, INT32_MAX));
|
||||
DEFINE_string(cert_file, "", "Certificate file to use.");
|
||||
DEFINE_string(key_file, "", "Key file to use.");
|
||||
DEFINE_string(log_file, "", "Path to where the log should be stored.");
|
||||
DEFINE_HIDDEN_string(
|
||||
log_link_basename, "",
|
||||
"Basename used for symlink creation to the last log file.");
|
||||
DEFINE_uint64(memory_warning_threshold, 1024,
|
||||
"Memory warning threshold, in MB. If Memgraph detects there is "
|
||||
"less available RAM it will log a warning. Set to 0 to "
|
||||
"disable.");
|
||||
|
||||
DEFINE_bool(telemetry_enabled, false,
|
||||
"Set to true to enable telemetry. We collect information about the "
|
||||
"running system (CPU and memory information) and information about "
|
||||
"the database runtime (vertex and edge counts and resource usage) "
|
||||
"to allow for easier improvement of the product.");
|
||||
DECLARE_string(durability_directory);
|
||||
|
||||
/** Encapsulates Dbms and Interpreter that are passed through the network server
|
||||
* and worker to the session. */
|
||||
struct SessionData {
|
||||
database::GraphDb *db{nullptr};
|
||||
query::Interpreter *interpreter{nullptr};
|
||||
auth::Auth auth{
|
||||
std::experimental::filesystem::path(FLAGS_durability_directory) / "auth"};
|
||||
};
|
||||
|
||||
class BoltSession final
|
||||
: public communication::bolt::Session<communication::InputStream,
|
||||
communication::OutputStream> {
|
||||
public:
|
||||
BoltSession(SessionData &data, communication::InputStream &input_stream,
|
||||
communication::OutputStream &output_stream)
|
||||
: communication::bolt::Session<communication::InputStream,
|
||||
communication::OutputStream>(
|
||||
input_stream, output_stream),
|
||||
transaction_engine_(data.db, data.interpreter),
|
||||
auth_(&data.auth) {}
|
||||
|
||||
using communication::bolt::Session<communication::InputStream,
|
||||
communication::OutputStream>::TEncoder;
|
||||
|
||||
std::vector<std::string> Interpret(
|
||||
const std::string &query,
|
||||
const std::map<std::string, communication::bolt::Value> ¶ms)
|
||||
override {
|
||||
std::map<std::string, query::TypedValue> params_tv;
|
||||
for (const auto &kv : params)
|
||||
params_tv.emplace(kv.first, glue::ToTypedValue(kv.second));
|
||||
try {
|
||||
auto result = transaction_engine_.Interpret(query, params_tv);
|
||||
if (user_) {
|
||||
const auto &permissions = user_->GetPermissions();
|
||||
for (const auto &privilege : result.second) {
|
||||
if (permissions.Has(glue::PrivilegeToPermission(privilege)) !=
|
||||
auth::PermissionLevel::GRANT) {
|
||||
transaction_engine_.Abort();
|
||||
throw communication::bolt::ClientError(
|
||||
"You are not authorized to execute this query! Please contact "
|
||||
"your database administrator.");
|
||||
}
|
||||
}
|
||||
}
|
||||
return result.first;
|
||||
} catch (const query::QueryException &e) {
|
||||
// Wrap QueryException into ClientError, because we want to allow the
|
||||
// client to fix their query.
|
||||
throw communication::bolt::ClientError(e.what());
|
||||
}
|
||||
}
|
||||
|
||||
std::map<std::string, communication::bolt::Value> PullAll(
|
||||
TEncoder *encoder) override {
|
||||
try {
|
||||
TypedValueResultStream stream(encoder);
|
||||
const auto &summary = transaction_engine_.PullAll(&stream);
|
||||
std::map<std::string, communication::bolt::Value> decoded_summary;
|
||||
for (const auto &kv : summary) {
|
||||
decoded_summary.emplace(kv.first, glue::ToBoltValue(kv.second));
|
||||
}
|
||||
return decoded_summary;
|
||||
} catch (const query::QueryException &e) {
|
||||
// Wrap QueryException into ClientError, because we want to allow the
|
||||
// client to fix their query.
|
||||
throw communication::bolt::ClientError(e.what());
|
||||
}
|
||||
}
|
||||
|
||||
void Abort() override { transaction_engine_.Abort(); }
|
||||
|
||||
bool Authenticate(const std::string &username,
|
||||
const std::string &password) override {
|
||||
if (!auth_->HasUsers()) return true;
|
||||
user_ = auth_->Authenticate(username, password);
|
||||
return !!user_;
|
||||
}
|
||||
|
||||
private:
|
||||
// Wrapper around TEncoder which converts TypedValue to Value
|
||||
// before forwarding the calls to original TEncoder.
|
||||
class TypedValueResultStream {
|
||||
public:
|
||||
TypedValueResultStream(TEncoder *encoder) : encoder_(encoder) {}
|
||||
|
||||
void Result(const std::vector<query::TypedValue> &values) {
|
||||
std::vector<communication::bolt::Value> decoded_values;
|
||||
decoded_values.reserve(values.size());
|
||||
for (const auto &v : values) {
|
||||
decoded_values.push_back(glue::ToBoltValue(v));
|
||||
}
|
||||
encoder_->MessageRecord(decoded_values);
|
||||
}
|
||||
|
||||
private:
|
||||
TEncoder *encoder_;
|
||||
};
|
||||
|
||||
query::TransactionEngine transaction_engine_;
|
||||
auth::Auth *auth_;
|
||||
std::experimental::optional<auth::User> user_;
|
||||
};
|
||||
|
||||
using ServerT = communication::Server<BoltSession, SessionData>;
|
||||
using communication::ServerContext;
|
||||
|
||||
/**
|
||||
* Class that implements ResultStream API for Kafka.
|
||||
*
|
||||
* Kafka doesn't need to stream the import results back to the client so we
|
||||
* don't need any functionality here.
|
||||
*/
|
||||
class KafkaResultStream {
|
||||
public:
|
||||
void Result(const std::vector<query::TypedValue> &) {}
|
||||
};
|
||||
|
||||
// Needed to correctly handle memgraph destruction from a signal handler.
|
||||
// Without having some sort of a flag, it is possible that a signal is handled
|
||||
// when we are exiting main, inside destructors of database::GraphDb and
|
||||
// similar. The signal handler may then initiate another shutdown on memgraph
|
||||
// which is in half destructed state, causing invalid memory access and crash.
|
||||
volatile sig_atomic_t is_shutting_down = 0;
|
||||
|
||||
/// Set up signal handlers and register `shutdown` on SIGTERM and SIGINT.
|
||||
/// In most cases you don't have to call this. If you are using a custom server
|
||||
/// startup function for `WithInit`, then you probably need to use this to
|
||||
/// shutdown your server.
|
||||
void InitSignalHandlers(const std::function<void()> &shutdown_fun) {
|
||||
// Prevent handling shutdown inside a shutdown. For example, SIGINT handler
|
||||
// being interrupted by SIGTERM before is_shutting_down is set, thus causing
|
||||
// double shutdown.
|
||||
sigset_t block_shutdown_signals;
|
||||
sigemptyset(&block_shutdown_signals);
|
||||
sigaddset(&block_shutdown_signals, SIGTERM);
|
||||
sigaddset(&block_shutdown_signals, SIGINT);
|
||||
|
||||
// Wrap the shutdown function in a safe way to prevent recursive shutdown.
|
||||
auto shutdown = [shutdown_fun]() {
|
||||
if (is_shutting_down) return;
|
||||
is_shutting_down = 1;
|
||||
shutdown_fun();
|
||||
};
|
||||
|
||||
CHECK(utils::SignalHandler::RegisterHandler(utils::Signal::Terminate,
|
||||
shutdown, block_shutdown_signals))
|
||||
<< "Unable to register SIGTERM handler!";
|
||||
CHECK(utils::SignalHandler::RegisterHandler(utils::Signal::Interupt, shutdown,
|
||||
block_shutdown_signals))
|
||||
<< "Unable to register SIGINT handler!";
|
||||
|
||||
// Setup SIGUSR1 to be used for reopening log files, when e.g. logrotate
|
||||
// rotates our logs.
|
||||
CHECK(utils::SignalHandler::RegisterHandler(utils::Signal::User1, []() {
|
||||
google::CloseLogDestination(google::INFO);
|
||||
})) << "Unable to register SIGUSR1 handler!";
|
||||
}
|
||||
|
||||
/// Run the Memgraph server.
|
||||
///
|
||||
/// Sets up all the required state before running `memgraph_main` and does any
|
||||
/// required cleanup afterwards. `get_stats_prefix` is used to obtain the
|
||||
/// prefix when logging Memgraph's statistics.
|
||||
///
|
||||
/// Command line arguments and configuration files are read before calling any
|
||||
/// of the supplied functions. Therefore, you should use flags only from those
|
||||
/// functions, and *not before* invoking `WithInit`.
|
||||
///
|
||||
/// This should be the first and last thing a OS specific main function does.
|
||||
///
|
||||
/// A common example of usage is:
|
||||
///
|
||||
/// @code
|
||||
/// int main(int argc, char *argv[]) {
|
||||
/// auto get_stats_prefix = []() -> std::string { return "memgraph"; };
|
||||
/// return WithInit(argc, argv, get_stats_prefix, SingleNodeMain);
|
||||
/// }
|
||||
/// @endcode
|
||||
///
|
||||
/// If you wish to start Memgraph server in another way, you can pass a
|
||||
/// `memgraph_main` functions which does that. You should take care to call
|
||||
/// `InitSignalHandlers` with appropriate function to shutdown the server you
|
||||
/// started.
|
||||
int WithInit(int argc, char **argv,
|
||||
const std::function<std::string()> &get_stats_prefix,
|
||||
const std::function<void()> &memgraph_main) {
|
||||
gflags::SetVersionString(version_string);
|
||||
|
||||
// Load config before parsing arguments, so that flags from the command line
|
||||
// overwrite the config.
|
||||
LoadConfig();
|
||||
gflags::ParseCommandLineFlags(&argc, &argv, true);
|
||||
|
||||
google::InitGoogleLogging(argv[0]);
|
||||
google::SetLogDestination(google::INFO, FLAGS_log_file.c_str());
|
||||
google::SetLogSymlink(google::INFO, FLAGS_log_link_basename.c_str());
|
||||
|
||||
// Unhandled exception handler init.
|
||||
std::set_terminate(&utils::TerminateHandler);
|
||||
|
||||
stats::InitStatsLogging(get_stats_prefix());
|
||||
utils::OnScopeExit stop_stats([] { stats::StopStatsLogging(); });
|
||||
|
||||
// Initialize the communication library.
|
||||
communication::Init();
|
||||
|
||||
// Start memory warning logger.
|
||||
utils::Scheduler mem_log_scheduler;
|
||||
if (FLAGS_memory_warning_threshold > 0) {
|
||||
auto free_ram = utils::sysinfo::AvailableMemoryKilobytes();
|
||||
if (free_ram) {
|
||||
mem_log_scheduler.Run("Memory warning", std::chrono::seconds(3), [] {
|
||||
auto free_ram = utils::sysinfo::AvailableMemoryKilobytes();
|
||||
if (free_ram && *free_ram / 1024 < FLAGS_memory_warning_threshold)
|
||||
LOG(WARNING) << "Running out of available RAM, only "
|
||||
<< *free_ram / 1024 << " MB left.";
|
||||
});
|
||||
} else {
|
||||
// Kernel version for the `MemAvailable` value is from: man procfs
|
||||
LOG(WARNING) << "You have an older kernel version (<3.14) or the /proc "
|
||||
"filesystem isn't available so remaining memory warnings "
|
||||
"won't be available.";
|
||||
}
|
||||
}
|
||||
requests::Init();
|
||||
|
||||
memgraph_main();
|
||||
return 0;
|
||||
}
|
||||
|
||||
void SingleNodeMain() {
|
||||
google::SetUsageMessage("Memgraph single-node database server");
|
||||
database::SingleNode db;
|
||||
query::Interpreter interpreter;
|
||||
SessionData session_data{&db, &interpreter};
|
||||
|
||||
auto stream_writer =
|
||||
[&session_data](
|
||||
const std::string &query,
|
||||
const std::map<std::string, communication::bolt::Value> ¶ms) {
|
||||
auto dba = session_data.db->Access();
|
||||
KafkaResultStream stream;
|
||||
std::map<std::string, query::TypedValue> params_tv;
|
||||
for (const auto &kv : params)
|
||||
params_tv.emplace(kv.first, glue::ToTypedValue(kv.second));
|
||||
try {
|
||||
(*session_data.interpreter)(query, *dba, params_tv, false)
|
||||
.PullAll(stream);
|
||||
dba->Commit();
|
||||
} catch (const query::QueryException &e) {
|
||||
LOG(WARNING) << "[Kafka] query execution failed with an exception: "
|
||||
<< e.what();
|
||||
dba->Abort();
|
||||
}
|
||||
};
|
||||
|
||||
integrations::kafka::Streams kafka_streams{
|
||||
std::experimental::filesystem::path(FLAGS_durability_directory) /
|
||||
"streams",
|
||||
stream_writer};
|
||||
[&session_data](
|
||||
const std::string &query,
|
||||
const std::map<std::string, communication::bolt::Value> ¶ms) {
|
||||
KafkaStreamWriter(session_data, query, params);
|
||||
}};
|
||||
|
||||
try {
|
||||
// Recover possible streams.
|
||||
@ -399,30 +132,14 @@ void MasterMain() {
|
||||
query::DistributedInterpreter interpreter(&db);
|
||||
SessionData session_data{&db, &interpreter};
|
||||
|
||||
auto stream_writer =
|
||||
[&session_data](
|
||||
const std::string &query,
|
||||
const std::map<std::string, communication::bolt::Value> ¶ms) {
|
||||
auto dba = session_data.db->Access();
|
||||
KafkaResultStream stream;
|
||||
std::map<std::string, query::TypedValue> params_tv;
|
||||
for (const auto &kv : params)
|
||||
params_tv.emplace(kv.first, glue::ToTypedValue(kv.second));
|
||||
try {
|
||||
(*session_data.interpreter)(query, *dba, params_tv, false)
|
||||
.PullAll(stream);
|
||||
dba->Commit();
|
||||
} catch (const query::QueryException &e) {
|
||||
LOG(WARNING) << "[Kafka] query execution failed with an exception: "
|
||||
<< e.what();
|
||||
dba->Abort();
|
||||
}
|
||||
};
|
||||
|
||||
integrations::kafka::Streams kafka_streams{
|
||||
std::experimental::filesystem::path(FLAGS_durability_directory) /
|
||||
"streams",
|
||||
stream_writer};
|
||||
[&session_data](
|
||||
const std::string &query,
|
||||
const std::map<std::string, communication::bolt::Value> ¶ms) {
|
||||
KafkaStreamWriter(session_data, query, params);
|
||||
}};
|
||||
|
||||
try {
|
||||
// Recover possible streams.
|
||||
@ -431,6 +148,7 @@ void MasterMain() {
|
||||
LOG(ERROR) << e.what();
|
||||
}
|
||||
|
||||
session_data.interpreter->auth_ = &session_data.auth;
|
||||
session_data.interpreter->kafka_streams_ = &kafka_streams;
|
||||
|
||||
ServerContext context;
|
||||
|
206
src/memgraph_init.cpp
Normal file
206
src/memgraph_init.cpp
Normal file
@ -0,0 +1,206 @@
|
||||
#include "memgraph_init.hpp"
|
||||
|
||||
#include <glog/logging.h>
|
||||
|
||||
#include "config.hpp"
|
||||
#include "glue/auth.hpp"
|
||||
#include "glue/communication.hpp"
|
||||
#include "query/exceptions.hpp"
|
||||
#include "requests/requests.hpp"
|
||||
#include "stats/stats.hpp"
|
||||
#include "utils/signals.hpp"
|
||||
#include "utils/sysinfo/memory.hpp"
|
||||
#include "utils/terminate_handler.hpp"
|
||||
#include "version.hpp"
|
||||
|
||||
DEFINE_string(log_file, "", "Path to where the log should be stored.");
|
||||
DEFINE_HIDDEN_string(
|
||||
log_link_basename, "",
|
||||
"Basename used for symlink creation to the last log file.");
|
||||
DEFINE_uint64(memory_warning_threshold, 1024,
|
||||
"Memory warning threshold, in MB. If Memgraph detects there is "
|
||||
"less available RAM it will log a warning. Set to 0 to "
|
||||
"disable.");
|
||||
|
||||
BoltSession::BoltSession(SessionData &data,
|
||||
communication::InputStream &input_stream,
|
||||
communication::OutputStream &output_stream)
|
||||
: communication::bolt::Session<communication::InputStream,
|
||||
communication::OutputStream>(input_stream,
|
||||
output_stream),
|
||||
transaction_engine_(data.db, data.interpreter),
|
||||
auth_(&data.auth) {}
|
||||
|
||||
using TEncoder =
|
||||
communication::bolt::Session<communication::InputStream,
|
||||
communication::OutputStream>::TEncoder;
|
||||
|
||||
std::vector<std::string> BoltSession::Interpret(
|
||||
const std::string &query,
|
||||
const std::map<std::string, communication::bolt::Value> ¶ms) {
|
||||
std::map<std::string, query::TypedValue> params_tv;
|
||||
for (const auto &kv : params)
|
||||
params_tv.emplace(kv.first, glue::ToTypedValue(kv.second));
|
||||
try {
|
||||
auto result = transaction_engine_.Interpret(query, params_tv);
|
||||
if (user_) {
|
||||
const auto &permissions = user_->GetPermissions();
|
||||
for (const auto &privilege : result.second) {
|
||||
if (permissions.Has(glue::PrivilegeToPermission(privilege)) !=
|
||||
auth::PermissionLevel::GRANT) {
|
||||
transaction_engine_.Abort();
|
||||
throw communication::bolt::ClientError(
|
||||
"You are not authorized to execute this query! Please contact "
|
||||
"your database administrator.");
|
||||
}
|
||||
}
|
||||
}
|
||||
return result.first;
|
||||
|
||||
} catch (const query::QueryException &e) {
|
||||
// Wrap QueryException into ClientError, because we want to allow the
|
||||
// client to fix their query.
|
||||
throw communication::bolt::ClientError(e.what());
|
||||
}
|
||||
}
|
||||
|
||||
std::map<std::string, communication::bolt::Value> BoltSession::PullAll(
|
||||
TEncoder *encoder) {
|
||||
try {
|
||||
TypedValueResultStream stream(encoder);
|
||||
const auto &summary = transaction_engine_.PullAll(&stream);
|
||||
std::map<std::string, communication::bolt::Value> decoded_summary;
|
||||
for (const auto &kv : summary) {
|
||||
decoded_summary.emplace(kv.first, glue::ToBoltValue(kv.second));
|
||||
}
|
||||
return decoded_summary;
|
||||
} catch (const query::QueryException &e) {
|
||||
// Wrap QueryException into ClientError, because we want to allow the
|
||||
// client to fix their query.
|
||||
throw communication::bolt::ClientError(e.what());
|
||||
}
|
||||
}
|
||||
|
||||
void BoltSession::Abort() { transaction_engine_.Abort(); }
|
||||
|
||||
bool BoltSession::Authenticate(const std::string &username,
|
||||
const std::string &password) {
|
||||
if (!auth_->HasUsers()) return true;
|
||||
user_ = auth_->Authenticate(username, password);
|
||||
return !!user_;
|
||||
}
|
||||
|
||||
BoltSession::TypedValueResultStream::TypedValueResultStream(TEncoder *encoder)
|
||||
: encoder_(encoder) {}
|
||||
|
||||
void BoltSession::TypedValueResultStream::Result(
|
||||
const std::vector<query::TypedValue> &values) {
|
||||
std::vector<communication::bolt::Value> decoded_values;
|
||||
decoded_values.reserve(values.size());
|
||||
for (const auto &v : values) {
|
||||
decoded_values.push_back(glue::ToBoltValue(v));
|
||||
}
|
||||
encoder_->MessageRecord(decoded_values);
|
||||
}
|
||||
|
||||
void KafkaStreamWriter(
|
||||
SessionData &session_data, const std::string &query,
|
||||
const std::map<std::string, communication::bolt::Value> ¶ms) {
|
||||
auto dba = session_data.db->Access();
|
||||
KafkaResultStream stream;
|
||||
std::map<std::string, query::TypedValue> params_tv;
|
||||
for (const auto &kv : params)
|
||||
params_tv.emplace(kv.first, glue::ToTypedValue(kv.second));
|
||||
try {
|
||||
(*session_data.interpreter)(query, *dba, params_tv, false).PullAll(stream);
|
||||
dba->Commit();
|
||||
} catch (const query::QueryException &e) {
|
||||
LOG(WARNING) << "[Kafka] query execution failed with an exception: "
|
||||
<< e.what();
|
||||
dba->Abort();
|
||||
}
|
||||
};
|
||||
|
||||
// Needed to correctly handle memgraph destruction from a signal handler.
|
||||
// Without having some sort of a flag, it is possible that a signal is handled
|
||||
// when we are exiting main, inside destructors of database::GraphDb and
|
||||
// similar. The signal handler may then initiate another shutdown on memgraph
|
||||
// which is in half destructed state, causing invalid memory access and crash.
|
||||
volatile sig_atomic_t is_shutting_down = 0;
|
||||
|
||||
void InitSignalHandlers(const std::function<void()> &shutdown_fun) {
|
||||
// Prevent handling shutdown inside a shutdown. For example, SIGINT handler
|
||||
// being interrupted by SIGTERM before is_shutting_down is set, thus causing
|
||||
// double shutdown.
|
||||
sigset_t block_shutdown_signals;
|
||||
sigemptyset(&block_shutdown_signals);
|
||||
sigaddset(&block_shutdown_signals, SIGTERM);
|
||||
sigaddset(&block_shutdown_signals, SIGINT);
|
||||
|
||||
// Wrap the shutdown function in a safe way to prevent recursive shutdown.
|
||||
auto shutdown = [shutdown_fun]() {
|
||||
if (is_shutting_down) return;
|
||||
is_shutting_down = 1;
|
||||
shutdown_fun();
|
||||
};
|
||||
|
||||
CHECK(utils::SignalHandler::RegisterHandler(utils::Signal::Terminate,
|
||||
shutdown, block_shutdown_signals))
|
||||
<< "Unable to register SIGTERM handler!";
|
||||
CHECK(utils::SignalHandler::RegisterHandler(utils::Signal::Interupt, shutdown,
|
||||
block_shutdown_signals))
|
||||
<< "Unable to register SIGINT handler!";
|
||||
|
||||
// Setup SIGUSR1 to be used for reopening log files, when e.g. logrotate
|
||||
// rotates our logs.
|
||||
CHECK(utils::SignalHandler::RegisterHandler(utils::Signal::User1, []() {
|
||||
google::CloseLogDestination(google::INFO);
|
||||
})) << "Unable to register SIGUSR1 handler!";
|
||||
}
|
||||
|
||||
int WithInit(int argc, char **argv,
|
||||
const std::function<std::string()> &get_stats_prefix,
|
||||
const std::function<void()> &memgraph_main) {
|
||||
gflags::SetVersionString(version_string);
|
||||
|
||||
// Load config before parsing arguments, so that flags from the command line
|
||||
// overwrite the config.
|
||||
LoadConfig();
|
||||
gflags::ParseCommandLineFlags(&argc, &argv, true);
|
||||
|
||||
google::InitGoogleLogging(argv[0]);
|
||||
google::SetLogDestination(google::INFO, FLAGS_log_file.c_str());
|
||||
google::SetLogSymlink(google::INFO, FLAGS_log_link_basename.c_str());
|
||||
|
||||
// Unhandled exception handler init.
|
||||
std::set_terminate(&utils::TerminateHandler);
|
||||
|
||||
stats::InitStatsLogging(get_stats_prefix());
|
||||
utils::OnScopeExit stop_stats([] { stats::StopStatsLogging(); });
|
||||
|
||||
// Initialize the communication library.
|
||||
communication::Init();
|
||||
|
||||
// Start memory warning logger.
|
||||
utils::Scheduler mem_log_scheduler;
|
||||
if (FLAGS_memory_warning_threshold > 0) {
|
||||
auto free_ram = utils::sysinfo::AvailableMemoryKilobytes();
|
||||
if (free_ram) {
|
||||
mem_log_scheduler.Run("Memory warning", std::chrono::seconds(3), [] {
|
||||
auto free_ram = utils::sysinfo::AvailableMemoryKilobytes();
|
||||
if (free_ram && *free_ram / 1024 < FLAGS_memory_warning_threshold)
|
||||
LOG(WARNING) << "Running out of available RAM, only "
|
||||
<< *free_ram / 1024 << " MB left.";
|
||||
});
|
||||
} else {
|
||||
// Kernel version for the `MemAvailable` value is from: man procfs
|
||||
LOG(WARNING) << "You have an older kernel version (<3.14) or the /proc "
|
||||
"filesystem isn't available so remaining memory warnings "
|
||||
"won't be available.";
|
||||
}
|
||||
}
|
||||
requests::Init();
|
||||
|
||||
memgraph_main();
|
||||
return 0;
|
||||
}
|
115
src/memgraph_init.hpp
Normal file
115
src/memgraph_init.hpp
Normal file
@ -0,0 +1,115 @@
|
||||
#pragma once
|
||||
|
||||
#include <csignal>
|
||||
#include <experimental/filesystem>
|
||||
#include <experimental/optional>
|
||||
#include <map>
|
||||
#include <string>
|
||||
|
||||
#include <gflags/gflags.h>
|
||||
|
||||
#include "auth/auth.hpp"
|
||||
#include "communication/bolt/v1/session.hpp"
|
||||
#include "distributed/pull_rpc_clients.hpp"
|
||||
#include "query/interpreter.hpp"
|
||||
#include "query/transaction_engine.hpp"
|
||||
|
||||
DECLARE_string(durability_directory);
|
||||
|
||||
/// Encapsulates Dbms and Interpreter that are passed through the network server
|
||||
/// and worker to the session.
|
||||
struct SessionData {
|
||||
database::GraphDb *db{nullptr};
|
||||
query::Interpreter *interpreter{nullptr};
|
||||
auth::Auth auth{
|
||||
std::experimental::filesystem::path(FLAGS_durability_directory) / "auth"};
|
||||
};
|
||||
|
||||
class BoltSession final
|
||||
: public communication::bolt::Session<communication::InputStream,
|
||||
communication::OutputStream> {
|
||||
public:
|
||||
BoltSession(SessionData &data, communication::InputStream &input_stream,
|
||||
communication::OutputStream &output_stream);
|
||||
|
||||
using communication::bolt::Session<communication::InputStream,
|
||||
communication::OutputStream>::TEncoder;
|
||||
|
||||
std::vector<std::string> Interpret(
|
||||
const std::string &query,
|
||||
const std::map<std::string, communication::bolt::Value> ¶ms) override;
|
||||
|
||||
std::map<std::string, communication::bolt::Value> PullAll(
|
||||
TEncoder *encoder) override;
|
||||
|
||||
void Abort() override;
|
||||
|
||||
bool Authenticate(const std::string &username,
|
||||
const std::string &password) override;
|
||||
|
||||
private:
|
||||
/// Wrapper around TEncoder which converts TypedValue to Value
|
||||
/// before forwarding the calls to original TEncoder.
|
||||
class TypedValueResultStream {
|
||||
public:
|
||||
TypedValueResultStream(TEncoder *encoder);
|
||||
|
||||
void Result(const std::vector<query::TypedValue> &values);
|
||||
|
||||
private:
|
||||
TEncoder *encoder_;
|
||||
};
|
||||
|
||||
query::TransactionEngine transaction_engine_;
|
||||
auth::Auth *auth_;
|
||||
std::experimental::optional<auth::User> user_;
|
||||
};
|
||||
|
||||
/// Class that implements ResultStream API for Kafka.
|
||||
///
|
||||
/// Kafka doesn't need to stream the import results back to the client so we
|
||||
/// don't need any functionality here.
|
||||
class KafkaResultStream {
|
||||
public:
|
||||
void Result(const std::vector<query::TypedValue> &) {}
|
||||
};
|
||||
|
||||
/// Writes data streamed from kafka to memgraph.
|
||||
void KafkaStreamWriter(
|
||||
SessionData &session_data, const std::string &query,
|
||||
const std::map<std::string, communication::bolt::Value> ¶ms);
|
||||
|
||||
/// Set up signal handlers and register `shutdown` on SIGTERM and SIGINT.
|
||||
/// In most cases you don't have to call this. If you are using a custom server
|
||||
/// startup function for `WithInit`, then you probably need to use this to
|
||||
/// shutdown your server.
|
||||
void InitSignalHandlers(const std::function<void()> &shutdown_fun);
|
||||
|
||||
/// Run the Memgraph server.
|
||||
///
|
||||
/// Sets up all the required state before running `memgraph_main` and does any
|
||||
/// required cleanup afterwards. `get_stats_prefix` is used to obtain the
|
||||
/// prefix when logging Memgraph's statistics.
|
||||
///
|
||||
/// Command line arguments and configuration files are read before calling any
|
||||
/// of the supplied functions. Therefore, you should use flags only from those
|
||||
/// functions, and *not before* invoking `WithInit`.
|
||||
///
|
||||
/// This should be the first and last thing a OS specific main function does.
|
||||
///
|
||||
/// A common example of usage is:
|
||||
///
|
||||
/// @code
|
||||
/// int main(int argc, char *argv[]) {
|
||||
/// auto get_stats_prefix = []() -> std::string { return "memgraph"; };
|
||||
/// return WithInit(argc, argv, get_stats_prefix, SingleNodeMain);
|
||||
/// }
|
||||
/// @endcode
|
||||
///
|
||||
/// If you wish to start Memgraph server in another way, you can pass a
|
||||
/// `memgraph_main` functions which does that. You should take care to call
|
||||
/// `InitSignalHandlers` with appropriate function to shutdown the server you
|
||||
/// started.
|
||||
int WithInit(int argc, char **argv,
|
||||
const std::function<std::string()> &get_stats_prefix,
|
||||
const std::function<void()> &memgraph_main);
|
@ -27,3 +27,6 @@ add_subdirectory(property_based)
|
||||
|
||||
# integration test binaries
|
||||
add_subdirectory(integration)
|
||||
|
||||
# feature benchmark test binaries
|
||||
add_subdirectory(feature_benchmark)
|
||||
|
2
tests/feature_benchmark/CMakeLists.txt
Normal file
2
tests/feature_benchmark/CMakeLists.txt
Normal file
@ -0,0 +1,2 @@
|
||||
# kafka test binaries
|
||||
add_subdirectory(kafka)
|
10
tests/feature_benchmark/apollo_runs.yaml
Normal file
10
tests/feature_benchmark/apollo_runs.yaml
Normal file
@ -0,0 +1,10 @@
|
||||
- name: feature_benchmark__kafka
|
||||
cd: kafka
|
||||
commands: ./runner.sh
|
||||
infiles:
|
||||
- runner.sh # runner script
|
||||
- transform.py # transform script
|
||||
- generate.py # dataset generator script
|
||||
- ../../../build_release/tests/feature_benchmark/kafka/kafka.py # kafka script
|
||||
- ../../../build_release/tests/feature_benchmark/kafka/benchmark # benchmark binary
|
||||
enable_network: true
|
9
tests/feature_benchmark/kafka/CMakeLists.txt
Normal file
9
tests/feature_benchmark/kafka/CMakeLists.txt
Normal file
@ -0,0 +1,9 @@
|
||||
set(target_name memgraph__feature_benchmark__kafka)
|
||||
|
||||
set(benchmark_target_name ${target_name}__benchmark)
|
||||
add_executable(${benchmark_target_name} benchmark.cpp)
|
||||
set_target_properties(${benchmark_target_name} PROPERTIES OUTPUT_NAME benchmark)
|
||||
target_link_libraries(${benchmark_target_name} memgraph_lib kvstore_lib)
|
||||
|
||||
# Copy kafka.py to the feature integration kafka folder
|
||||
configure_file(${PROJECT_SOURCE_DIR}/src/integrations/kafka/kafka.py ${CMAKE_CURRENT_BINARY_DIR} COPYONLY)
|
108
tests/feature_benchmark/kafka/benchmark.cpp
Normal file
108
tests/feature_benchmark/kafka/benchmark.cpp
Normal file
@ -0,0 +1,108 @@
|
||||
#include <algorithm>
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <cstdint>
|
||||
#include <exception>
|
||||
#include <fstream>
|
||||
#include <functional>
|
||||
#include <limits>
|
||||
#include <thread>
|
||||
|
||||
#include <gflags/gflags.h>
|
||||
#include <glog/logging.h>
|
||||
#include "json/json.hpp"
|
||||
|
||||
#include "database/graph_db.hpp"
|
||||
#include "integrations/kafka/streams.hpp"
|
||||
#include "memgraph_init.hpp"
|
||||
#include "utils/flag_validation.hpp"
|
||||
#include "utils/timer.hpp"
|
||||
|
||||
using namespace std::literals::chrono_literals;
|
||||
|
||||
DEFINE_int64(import_count, 0, "How many entries should we import.");
|
||||
DEFINE_int64(timeout, 60, "How many seconds should the benchmark wait.");
|
||||
DEFINE_string(kafka_uri, "", "Kafka URI.");
|
||||
DEFINE_string(topic_name, "", "Kafka topic.");
|
||||
DEFINE_string(transform_uri, "", "Transform script URI.");
|
||||
DEFINE_string(output_file, "", "Output file where shold the results be.");
|
||||
|
||||
void KafkaBenchmarkMain() {
|
||||
google::SetUsageMessage("Memgraph kafka benchmark database server");
|
||||
query::Interpreter interpreter;
|
||||
database::SingleNode db;
|
||||
SessionData session_data{&db, &interpreter};
|
||||
|
||||
std::atomic<int64_t> query_counter{0};
|
||||
std::atomic<bool> timeout_reached{false};
|
||||
std::atomic<bool> benchmark_finished{false};
|
||||
|
||||
integrations::kafka::Streams kafka_streams{
|
||||
std::experimental::filesystem::path(FLAGS_durability_directory) /
|
||||
"streams",
|
||||
[&session_data, &query_counter](
|
||||
const std::string &query,
|
||||
const std::map<std::string, communication::bolt::Value> ¶ms) {
|
||||
KafkaStreamWriter(session_data, query, params);
|
||||
query_counter++;
|
||||
}};
|
||||
|
||||
session_data.interpreter->auth_ = &session_data.auth;
|
||||
session_data.interpreter->kafka_streams_ = &kafka_streams;
|
||||
|
||||
std::string stream_name = "benchmark";
|
||||
|
||||
integrations::kafka::StreamInfo stream_info;
|
||||
stream_info.stream_name = stream_name;
|
||||
stream_info.stream_uri = FLAGS_kafka_uri;
|
||||
stream_info.stream_topic = FLAGS_topic_name;
|
||||
stream_info.transform_uri = FLAGS_transform_uri;
|
||||
|
||||
kafka_streams.Create(stream_info);
|
||||
kafka_streams.StartAll();
|
||||
|
||||
// Kickoff a thread that will timeout after FLAGS_timeout seconds
|
||||
std::thread timeout_thread_ =
|
||||
std::thread([&timeout_reached, &benchmark_finished]() {
|
||||
utils::ThreadSetName("BenchTimeout");
|
||||
for (int64_t i = 0; i < FLAGS_timeout; ++i) {
|
||||
std::this_thread::sleep_for(1s);
|
||||
if (benchmark_finished.load()) return;
|
||||
}
|
||||
|
||||
timeout_reached.store(true);
|
||||
});
|
||||
|
||||
// Wait for the import to start
|
||||
while (!timeout_reached.load() && query_counter.load() == 0) {
|
||||
std::this_thread::sleep_for(1ms);
|
||||
}
|
||||
|
||||
int64_t query_count_start = query_counter.load();
|
||||
utils::Timer timer;
|
||||
|
||||
// Wait for the import to finish
|
||||
while (!timeout_reached.load() && query_counter.load() < FLAGS_import_count) {
|
||||
std::this_thread::sleep_for(1ms);
|
||||
}
|
||||
|
||||
double duration = timer.Elapsed().count();
|
||||
kafka_streams.StopAll();
|
||||
kafka_streams.Drop(stream_name);
|
||||
|
||||
benchmark_finished.store(true);
|
||||
if (timeout_thread_.joinable()) timeout_thread_.join();
|
||||
|
||||
int64_t writes = query_counter.load() - query_count_start;
|
||||
double write_per_second = writes / duration;
|
||||
|
||||
std::ofstream output(FLAGS_output_file);
|
||||
output << "duration " << duration << std::endl;
|
||||
output << "executed_writes " << writes << std::endl;
|
||||
output << "write_per_second " << write_per_second << std::endl;
|
||||
output.close();
|
||||
}
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
return WithInit(argc, argv, []() { return "memgraph"; }, KafkaBenchmarkMain);
|
||||
}
|
44
tests/feature_benchmark/kafka/generate.py
Normal file
44
tests/feature_benchmark/kafka/generate.py
Normal file
@ -0,0 +1,44 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
'''
|
||||
Kafka benchmark dataset generator.
|
||||
'''
|
||||
|
||||
|
||||
import random
|
||||
import sys
|
||||
from argparse import ArgumentParser
|
||||
|
||||
|
||||
def get_edge():
|
||||
from_node = random.randint(0, args.nodes)
|
||||
to_node = random.randint(0, args.nodes)
|
||||
|
||||
while from_node == to_node:
|
||||
to_node = random.randint(0, args.nodes)
|
||||
return (from_node, to_node)
|
||||
|
||||
|
||||
def parse_args():
|
||||
argp = ArgumentParser(description=__doc__)
|
||||
argp.add_argument("--nodes", type=int, default=100, help="Number of nodes.")
|
||||
argp.add_argument("--edges", type=int, default=30, help="Number of edges.")
|
||||
return argp.parse_args()
|
||||
|
||||
|
||||
args = parse_args()
|
||||
edges = set()
|
||||
|
||||
for i in range(args.nodes):
|
||||
print("%d\n" % i)
|
||||
|
||||
for i in range(args.edges):
|
||||
edge = get_edge()
|
||||
while edge in edges:
|
||||
edge = get_edge()
|
||||
|
||||
edges.add(edge)
|
||||
print("%d %d\n" % edge)
|
||||
|
||||
sys.exit(0)
|
143
tests/feature_benchmark/kafka/runner.sh
Executable file
143
tests/feature_benchmark/kafka/runner.sh
Executable file
@ -0,0 +1,143 @@
|
||||
#!/bin/bash
|
||||
|
||||
## Helper functions
|
||||
|
||||
function wait_for_server {
|
||||
port=$1
|
||||
while ! nc -z -w 1 127.0.0.1 $port; do
|
||||
sleep 0.1
|
||||
done
|
||||
sleep 1
|
||||
}
|
||||
|
||||
function echo_info { printf "\033[1;36m~~ $1 ~~\033[0m\n"; }
|
||||
function echo_success { printf "\033[1;32m~~ $1 ~~\033[0m\n\n"; }
|
||||
function echo_failure { printf "\033[1;31m~~ $1 ~~\033[0m\n\n"; }
|
||||
|
||||
|
||||
## Environment setup
|
||||
|
||||
# Get script location.
|
||||
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
|
||||
cd "$DIR"
|
||||
|
||||
# Create a temporary directory.
|
||||
tmpdir=/tmp/memgraph_benchmark_kafka
|
||||
if [ -d $tmpdir ]; then
|
||||
rm -rf $tmpdir
|
||||
fi
|
||||
mkdir -p $tmpdir
|
||||
cd $tmpdir
|
||||
|
||||
# Download the kafka binaries.
|
||||
kafka="kafka_2.11-2.0.0"
|
||||
wget -nv http://deps.memgraph.io/$kafka.tgz
|
||||
tar -xf $kafka.tgz
|
||||
mv $kafka kafka
|
||||
|
||||
# Find memgraph binaries.
|
||||
binary_dir="$DIR/../../../build"
|
||||
if [ ! -d $binary_dir ]; then
|
||||
binary_dir="$DIR/../../../build_release"
|
||||
fi
|
||||
|
||||
# Cleanup old kafka logs.
|
||||
if [ -d /tmp/kafka-logs ]; then
|
||||
rm -rf /tmp/kafka-logs
|
||||
fi
|
||||
if [ -d /tmp/zookeeper ]; then
|
||||
rm -rf /tmp/zookeeper
|
||||
fi
|
||||
|
||||
# Results for apollo
|
||||
RESULTS="$DIR/.apollo_measurements"
|
||||
|
||||
# Benchmark parameters
|
||||
NODES=100000
|
||||
EDGES=10000
|
||||
|
||||
|
||||
## Startup
|
||||
|
||||
# Start the zookeeper process and wait for it to start.
|
||||
echo_info "Starting zookeeper"
|
||||
./kafka/bin/zookeeper-server-start.sh -daemon kafka/config/zookeeper.properties
|
||||
wait_for_server 2181
|
||||
echo_success "Started zookeeper"
|
||||
|
||||
# Start the kafka process and wait for it to start.
|
||||
echo_info "Starting kafka"
|
||||
./kafka/bin/kafka-server-start.sh -daemon kafka/config/server.properties
|
||||
wait_for_server 9092
|
||||
echo_success "Started kafka"
|
||||
|
||||
# Create the kafka topic.
|
||||
echo_info "Creating kafka topic test"
|
||||
./kafka/bin/kafka-topics.sh --create \
|
||||
--zookeeper 127.0.0.1:2181 \
|
||||
--replication-factor 1 \
|
||||
--partitions 1 \
|
||||
--topic test
|
||||
echo_success "Created kafka topic test"
|
||||
|
||||
# Start a http server to serve the transform script.
|
||||
echo_info "Starting Python HTTP server"
|
||||
mkdir serve
|
||||
cd serve
|
||||
cp "$DIR/transform.py" transform.py
|
||||
python3 -m http.server &
|
||||
http_server_pid=$!
|
||||
wait_for_server 8000
|
||||
cd ..
|
||||
echo_success "Started Python HTTP server"
|
||||
|
||||
|
||||
# Start the memgraph process and wait for it to start.
|
||||
echo_info "Starting kafka benchmark"
|
||||
$binary_dir/tests/feature_benchmark/kafka/benchmark \
|
||||
--import-count=$(($NODES + $EDGES)) \
|
||||
--timeout=60 \
|
||||
--kafka-uri="127.0.0.1:9092" \
|
||||
--topic-name="test" \
|
||||
--transform-uri="127.0.0.1:8000/transform.py" \
|
||||
--output-file=$RESULTS &
|
||||
pid=$!
|
||||
|
||||
# Allow benchmark to initialize
|
||||
sleep 5
|
||||
|
||||
echo_info "Generating kafka messages"
|
||||
python3 "$DIR/generate.py" --nodes $NODES --edges $EDGES | \
|
||||
./kafka/bin/kafka-console-producer.sh \
|
||||
--broker-list 127.0.0.1:9092 \
|
||||
--topic test \
|
||||
> /dev/null
|
||||
echo_success "Finished generating kafka messages"
|
||||
|
||||
wait -n $pid
|
||||
code=$?
|
||||
|
||||
if [ $code -eq 0 ]; then
|
||||
echo_success "Benchmark finished successfully"
|
||||
else
|
||||
echo_failure "Benchmark didn't finish successfully"
|
||||
fi
|
||||
|
||||
## Cleanup
|
||||
|
||||
echo_info "Starting test cleanup"
|
||||
|
||||
# Shutdown the http server.
|
||||
kill $http_server_pid
|
||||
wait -n
|
||||
|
||||
# Shutdown the kafka process.
|
||||
./kafka/bin/kafka-server-stop.sh
|
||||
|
||||
# Shutdown the zookeeper process.
|
||||
./kafka/bin/zookeeper-server-stop.sh
|
||||
|
||||
echo_success "Test cleanup done"
|
||||
|
||||
[ $code -ne 0 ] && exit $code
|
||||
exit 0
|
15
tests/feature_benchmark/kafka/transform.py
Normal file
15
tests/feature_benchmark/kafka/transform.py
Normal file
@ -0,0 +1,15 @@
|
||||
index_done = False
|
||||
|
||||
def stream(batch):
|
||||
global index_done
|
||||
ret = []
|
||||
if not index_done:
|
||||
ret.append(("CREATE INDEX ON :node(num)", {}))
|
||||
index_done = True
|
||||
for item in batch:
|
||||
message = item.decode("utf-8").strip().split()
|
||||
if len(message) == 1:
|
||||
ret.append(("CREATE (:node {num: $num})", {"num": message[0]}))
|
||||
elif len(message) == 2:
|
||||
ret.append(("MATCH (n:node {num: $num1}), (m:node {num: $num2}) CREATE (n)-[:et]->(m)", {"num1": message[0], "num2": message[1]}))
|
||||
return ret
|
Loading…
Reference in New Issue
Block a user