diff --git a/apollo_build.yaml b/apollo_build.yaml index 00afbba8a..c2c4a0f3b 100644 --- a/apollo_build.yaml +++ b/apollo_build.yaml @@ -31,7 +31,7 @@ mkdir build_release cd build_release cmake -DCMAKE_BUILD_TYPE=release .. - TIMEOUT=1200 make -j$THREADS memgraph tools memgraph__macro_benchmark memgraph__stress memgraph__manual__card_fraud_generate_snapshot + TIMEOUT=1200 make -j$THREADS memgraph tools memgraph__macro_benchmark memgraph__stress memgraph__manual__card_fraud_generate_snapshot memgraph__feature_benchmark__kafka__benchmark # Generate distributed card fraud dataset. cd ../tests/distributed/card_fraud diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index e6bf5b386..947c514fd 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -77,6 +77,7 @@ set(memgraph_src_files transactions/engine_single_node.cpp transactions/engine_worker.cpp transactions/snapshot.cpp + memgraph_init.cpp ) # ----------------------------------------------------------------------------- diff --git a/src/integrations/kafka/consumer.cpp b/src/integrations/kafka/consumer.cpp index f357fa46c..dc77c25f6 100644 --- a/src/integrations/kafka/consumer.cpp +++ b/src/integrations/kafka/consumer.cpp @@ -129,7 +129,6 @@ void Consumer::StartConsuming( transform_alive_.store(true); while (is_running_) { - // TODO (msantl): Figure out what to do with potential exceptions here. auto batch = this->GetBatch(); if (batch.empty()) continue; @@ -143,7 +142,7 @@ void Consumer::StartConsuming( // TODO (mferencevic): Figure out what to do with all other exceptions. try { transform.Apply(batch, stream_writer_); - } catch (const TransformExecutionException) { + } catch (const TransformExecutionException &) { LOG(WARNING) << "[Kafka] stream " << info_.stream_name << " the transform process has died!"; break; diff --git a/src/integrations/kafka/streams.hpp b/src/integrations/kafka/streams.hpp index 4039160fb..72aaec9dd 100644 --- a/src/integrations/kafka/streams.hpp +++ b/src/integrations/kafka/streams.hpp @@ -67,6 +67,7 @@ class Streams final { /// @param batch_limit number of batches we want to import before stopping /// /// @throws StreamDoesntExistException if the stream doesn't exist + /// @throws ConsumerRunningException if the consumer is already running /// @throws StreamMetadataCouldNotBeStored if it can't persist metadata void Start(const std::string &stream_name, std::experimental::optional batch_limit = @@ -77,6 +78,7 @@ class Streams final { /// @param stream_name name of the stream we wanto to stop consuming /// /// @throws StreamDoesntExistException if the stream doesn't exist + /// @throws ConsumerStoppedException if the consumer is already stopped /// @throws StreamMetadataCouldNotBeStored if it can't persist metadata void Stop(const std::string &stream_name); diff --git a/src/memgraph_bolt.cpp b/src/memgraph_bolt.cpp index c971ad7de..54f960b7c 100644 --- a/src/memgraph_bolt.cpp +++ b/src/memgraph_bolt.cpp @@ -1,38 +1,23 @@ #include #include -#include #include #include #include #include -#include #include #include #include -#include "auth/auth.hpp" -#include "communication/bolt/v1/session.hpp" -#include "config.hpp" #include "database/distributed_graph_db.hpp" #include "database/graph_db.hpp" -#include "distributed/pull_rpc_clients.hpp" -#include "glue/auth.hpp" -#include "glue/communication.hpp" #include "integrations/kafka/exceptions.hpp" #include "integrations/kafka/streams.hpp" +#include "memgraph_init.hpp" #include "query/distributed_interpreter.hpp" #include "query/exceptions.hpp" -#include "query/interpreter.hpp" -#include "query/transaction_engine.hpp" -#include "requests/requests.hpp" -#include "stats/stats.hpp" #include "telemetry/telemetry.hpp" #include "utils/flag_validation.hpp" -#include "utils/signals.hpp" -#include "utils/sysinfo/memory.hpp" -#include "utils/terminate_handler.hpp" -#include "version.hpp" // Common stuff for enterprise and community editions @@ -50,282 +35,30 @@ DEFINE_VALIDATED_int32(session_inactivity_timeout, 1800, FLAG_IN_RANGE(1, INT32_MAX)); DEFINE_string(cert_file, "", "Certificate file to use."); DEFINE_string(key_file, "", "Key file to use."); -DEFINE_string(log_file, "", "Path to where the log should be stored."); -DEFINE_HIDDEN_string( - log_link_basename, "", - "Basename used for symlink creation to the last log file."); -DEFINE_uint64(memory_warning_threshold, 1024, - "Memory warning threshold, in MB. If Memgraph detects there is " - "less available RAM it will log a warning. Set to 0 to " - "disable."); + DEFINE_bool(telemetry_enabled, false, "Set to true to enable telemetry. We collect information about the " "running system (CPU and memory information) and information about " "the database runtime (vertex and edge counts and resource usage) " "to allow for easier improvement of the product."); -DECLARE_string(durability_directory); - -/** Encapsulates Dbms and Interpreter that are passed through the network server - * and worker to the session. */ -struct SessionData { - database::GraphDb *db{nullptr}; - query::Interpreter *interpreter{nullptr}; - auth::Auth auth{ - std::experimental::filesystem::path(FLAGS_durability_directory) / "auth"}; -}; - -class BoltSession final - : public communication::bolt::Session { - public: - BoltSession(SessionData &data, communication::InputStream &input_stream, - communication::OutputStream &output_stream) - : communication::bolt::Session( - input_stream, output_stream), - transaction_engine_(data.db, data.interpreter), - auth_(&data.auth) {} - - using communication::bolt::Session::TEncoder; - - std::vector Interpret( - const std::string &query, - const std::map ¶ms) - override { - std::map params_tv; - for (const auto &kv : params) - params_tv.emplace(kv.first, glue::ToTypedValue(kv.second)); - try { - auto result = transaction_engine_.Interpret(query, params_tv); - if (user_) { - const auto &permissions = user_->GetPermissions(); - for (const auto &privilege : result.second) { - if (permissions.Has(glue::PrivilegeToPermission(privilege)) != - auth::PermissionLevel::GRANT) { - transaction_engine_.Abort(); - throw communication::bolt::ClientError( - "You are not authorized to execute this query! Please contact " - "your database administrator."); - } - } - } - return result.first; - } catch (const query::QueryException &e) { - // Wrap QueryException into ClientError, because we want to allow the - // client to fix their query. - throw communication::bolt::ClientError(e.what()); - } - } - - std::map PullAll( - TEncoder *encoder) override { - try { - TypedValueResultStream stream(encoder); - const auto &summary = transaction_engine_.PullAll(&stream); - std::map decoded_summary; - for (const auto &kv : summary) { - decoded_summary.emplace(kv.first, glue::ToBoltValue(kv.second)); - } - return decoded_summary; - } catch (const query::QueryException &e) { - // Wrap QueryException into ClientError, because we want to allow the - // client to fix their query. - throw communication::bolt::ClientError(e.what()); - } - } - - void Abort() override { transaction_engine_.Abort(); } - - bool Authenticate(const std::string &username, - const std::string &password) override { - if (!auth_->HasUsers()) return true; - user_ = auth_->Authenticate(username, password); - return !!user_; - } - - private: - // Wrapper around TEncoder which converts TypedValue to Value - // before forwarding the calls to original TEncoder. - class TypedValueResultStream { - public: - TypedValueResultStream(TEncoder *encoder) : encoder_(encoder) {} - - void Result(const std::vector &values) { - std::vector decoded_values; - decoded_values.reserve(values.size()); - for (const auto &v : values) { - decoded_values.push_back(glue::ToBoltValue(v)); - } - encoder_->MessageRecord(decoded_values); - } - - private: - TEncoder *encoder_; - }; - - query::TransactionEngine transaction_engine_; - auth::Auth *auth_; - std::experimental::optional user_; -}; using ServerT = communication::Server; using communication::ServerContext; -/** - * Class that implements ResultStream API for Kafka. - * - * Kafka doesn't need to stream the import results back to the client so we - * don't need any functionality here. - */ -class KafkaResultStream { - public: - void Result(const std::vector &) {} -}; - -// Needed to correctly handle memgraph destruction from a signal handler. -// Without having some sort of a flag, it is possible that a signal is handled -// when we are exiting main, inside destructors of database::GraphDb and -// similar. The signal handler may then initiate another shutdown on memgraph -// which is in half destructed state, causing invalid memory access and crash. -volatile sig_atomic_t is_shutting_down = 0; - -/// Set up signal handlers and register `shutdown` on SIGTERM and SIGINT. -/// In most cases you don't have to call this. If you are using a custom server -/// startup function for `WithInit`, then you probably need to use this to -/// shutdown your server. -void InitSignalHandlers(const std::function &shutdown_fun) { - // Prevent handling shutdown inside a shutdown. For example, SIGINT handler - // being interrupted by SIGTERM before is_shutting_down is set, thus causing - // double shutdown. - sigset_t block_shutdown_signals; - sigemptyset(&block_shutdown_signals); - sigaddset(&block_shutdown_signals, SIGTERM); - sigaddset(&block_shutdown_signals, SIGINT); - - // Wrap the shutdown function in a safe way to prevent recursive shutdown. - auto shutdown = [shutdown_fun]() { - if (is_shutting_down) return; - is_shutting_down = 1; - shutdown_fun(); - }; - - CHECK(utils::SignalHandler::RegisterHandler(utils::Signal::Terminate, - shutdown, block_shutdown_signals)) - << "Unable to register SIGTERM handler!"; - CHECK(utils::SignalHandler::RegisterHandler(utils::Signal::Interupt, shutdown, - block_shutdown_signals)) - << "Unable to register SIGINT handler!"; - - // Setup SIGUSR1 to be used for reopening log files, when e.g. logrotate - // rotates our logs. - CHECK(utils::SignalHandler::RegisterHandler(utils::Signal::User1, []() { - google::CloseLogDestination(google::INFO); - })) << "Unable to register SIGUSR1 handler!"; -} - -/// Run the Memgraph server. -/// -/// Sets up all the required state before running `memgraph_main` and does any -/// required cleanup afterwards. `get_stats_prefix` is used to obtain the -/// prefix when logging Memgraph's statistics. -/// -/// Command line arguments and configuration files are read before calling any -/// of the supplied functions. Therefore, you should use flags only from those -/// functions, and *not before* invoking `WithInit`. -/// -/// This should be the first and last thing a OS specific main function does. -/// -/// A common example of usage is: -/// -/// @code -/// int main(int argc, char *argv[]) { -/// auto get_stats_prefix = []() -> std::string { return "memgraph"; }; -/// return WithInit(argc, argv, get_stats_prefix, SingleNodeMain); -/// } -/// @endcode -/// -/// If you wish to start Memgraph server in another way, you can pass a -/// `memgraph_main` functions which does that. You should take care to call -/// `InitSignalHandlers` with appropriate function to shutdown the server you -/// started. -int WithInit(int argc, char **argv, - const std::function &get_stats_prefix, - const std::function &memgraph_main) { - gflags::SetVersionString(version_string); - - // Load config before parsing arguments, so that flags from the command line - // overwrite the config. - LoadConfig(); - gflags::ParseCommandLineFlags(&argc, &argv, true); - - google::InitGoogleLogging(argv[0]); - google::SetLogDestination(google::INFO, FLAGS_log_file.c_str()); - google::SetLogSymlink(google::INFO, FLAGS_log_link_basename.c_str()); - - // Unhandled exception handler init. - std::set_terminate(&utils::TerminateHandler); - - stats::InitStatsLogging(get_stats_prefix()); - utils::OnScopeExit stop_stats([] { stats::StopStatsLogging(); }); - - // Initialize the communication library. - communication::Init(); - - // Start memory warning logger. - utils::Scheduler mem_log_scheduler; - if (FLAGS_memory_warning_threshold > 0) { - auto free_ram = utils::sysinfo::AvailableMemoryKilobytes(); - if (free_ram) { - mem_log_scheduler.Run("Memory warning", std::chrono::seconds(3), [] { - auto free_ram = utils::sysinfo::AvailableMemoryKilobytes(); - if (free_ram && *free_ram / 1024 < FLAGS_memory_warning_threshold) - LOG(WARNING) << "Running out of available RAM, only " - << *free_ram / 1024 << " MB left."; - }); - } else { - // Kernel version for the `MemAvailable` value is from: man procfs - LOG(WARNING) << "You have an older kernel version (<3.14) or the /proc " - "filesystem isn't available so remaining memory warnings " - "won't be available."; - } - } - requests::Init(); - - memgraph_main(); - return 0; -} - void SingleNodeMain() { google::SetUsageMessage("Memgraph single-node database server"); database::SingleNode db; query::Interpreter interpreter; SessionData session_data{&db, &interpreter}; - auto stream_writer = - [&session_data]( - const std::string &query, - const std::map ¶ms) { - auto dba = session_data.db->Access(); - KafkaResultStream stream; - std::map params_tv; - for (const auto &kv : params) - params_tv.emplace(kv.first, glue::ToTypedValue(kv.second)); - try { - (*session_data.interpreter)(query, *dba, params_tv, false) - .PullAll(stream); - dba->Commit(); - } catch (const query::QueryException &e) { - LOG(WARNING) << "[Kafka] query execution failed with an exception: " - << e.what(); - dba->Abort(); - } - }; - integrations::kafka::Streams kafka_streams{ std::experimental::filesystem::path(FLAGS_durability_directory) / "streams", - stream_writer}; + [&session_data]( + const std::string &query, + const std::map ¶ms) { + KafkaStreamWriter(session_data, query, params); + }}; try { // Recover possible streams. @@ -399,30 +132,14 @@ void MasterMain() { query::DistributedInterpreter interpreter(&db); SessionData session_data{&db, &interpreter}; - auto stream_writer = - [&session_data]( - const std::string &query, - const std::map ¶ms) { - auto dba = session_data.db->Access(); - KafkaResultStream stream; - std::map params_tv; - for (const auto &kv : params) - params_tv.emplace(kv.first, glue::ToTypedValue(kv.second)); - try { - (*session_data.interpreter)(query, *dba, params_tv, false) - .PullAll(stream); - dba->Commit(); - } catch (const query::QueryException &e) { - LOG(WARNING) << "[Kafka] query execution failed with an exception: " - << e.what(); - dba->Abort(); - } - }; - integrations::kafka::Streams kafka_streams{ std::experimental::filesystem::path(FLAGS_durability_directory) / "streams", - stream_writer}; + [&session_data]( + const std::string &query, + const std::map ¶ms) { + KafkaStreamWriter(session_data, query, params); + }}; try { // Recover possible streams. @@ -431,6 +148,7 @@ void MasterMain() { LOG(ERROR) << e.what(); } + session_data.interpreter->auth_ = &session_data.auth; session_data.interpreter->kafka_streams_ = &kafka_streams; ServerContext context; diff --git a/src/memgraph_init.cpp b/src/memgraph_init.cpp new file mode 100644 index 000000000..2bde04018 --- /dev/null +++ b/src/memgraph_init.cpp @@ -0,0 +1,206 @@ +#include "memgraph_init.hpp" + +#include + +#include "config.hpp" +#include "glue/auth.hpp" +#include "glue/communication.hpp" +#include "query/exceptions.hpp" +#include "requests/requests.hpp" +#include "stats/stats.hpp" +#include "utils/signals.hpp" +#include "utils/sysinfo/memory.hpp" +#include "utils/terminate_handler.hpp" +#include "version.hpp" + +DEFINE_string(log_file, "", "Path to where the log should be stored."); +DEFINE_HIDDEN_string( + log_link_basename, "", + "Basename used for symlink creation to the last log file."); +DEFINE_uint64(memory_warning_threshold, 1024, + "Memory warning threshold, in MB. If Memgraph detects there is " + "less available RAM it will log a warning. Set to 0 to " + "disable."); + +BoltSession::BoltSession(SessionData &data, + communication::InputStream &input_stream, + communication::OutputStream &output_stream) + : communication::bolt::Session(input_stream, + output_stream), + transaction_engine_(data.db, data.interpreter), + auth_(&data.auth) {} + +using TEncoder = + communication::bolt::Session::TEncoder; + +std::vector BoltSession::Interpret( + const std::string &query, + const std::map ¶ms) { + std::map params_tv; + for (const auto &kv : params) + params_tv.emplace(kv.first, glue::ToTypedValue(kv.second)); + try { + auto result = transaction_engine_.Interpret(query, params_tv); + if (user_) { + const auto &permissions = user_->GetPermissions(); + for (const auto &privilege : result.second) { + if (permissions.Has(glue::PrivilegeToPermission(privilege)) != + auth::PermissionLevel::GRANT) { + transaction_engine_.Abort(); + throw communication::bolt::ClientError( + "You are not authorized to execute this query! Please contact " + "your database administrator."); + } + } + } + return result.first; + + } catch (const query::QueryException &e) { + // Wrap QueryException into ClientError, because we want to allow the + // client to fix their query. + throw communication::bolt::ClientError(e.what()); + } +} + +std::map BoltSession::PullAll( + TEncoder *encoder) { + try { + TypedValueResultStream stream(encoder); + const auto &summary = transaction_engine_.PullAll(&stream); + std::map decoded_summary; + for (const auto &kv : summary) { + decoded_summary.emplace(kv.first, glue::ToBoltValue(kv.second)); + } + return decoded_summary; + } catch (const query::QueryException &e) { + // Wrap QueryException into ClientError, because we want to allow the + // client to fix their query. + throw communication::bolt::ClientError(e.what()); + } +} + +void BoltSession::Abort() { transaction_engine_.Abort(); } + +bool BoltSession::Authenticate(const std::string &username, + const std::string &password) { + if (!auth_->HasUsers()) return true; + user_ = auth_->Authenticate(username, password); + return !!user_; +} + +BoltSession::TypedValueResultStream::TypedValueResultStream(TEncoder *encoder) + : encoder_(encoder) {} + +void BoltSession::TypedValueResultStream::Result( + const std::vector &values) { + std::vector decoded_values; + decoded_values.reserve(values.size()); + for (const auto &v : values) { + decoded_values.push_back(glue::ToBoltValue(v)); + } + encoder_->MessageRecord(decoded_values); +} + +void KafkaStreamWriter( + SessionData &session_data, const std::string &query, + const std::map ¶ms) { + auto dba = session_data.db->Access(); + KafkaResultStream stream; + std::map params_tv; + for (const auto &kv : params) + params_tv.emplace(kv.first, glue::ToTypedValue(kv.second)); + try { + (*session_data.interpreter)(query, *dba, params_tv, false).PullAll(stream); + dba->Commit(); + } catch (const query::QueryException &e) { + LOG(WARNING) << "[Kafka] query execution failed with an exception: " + << e.what(); + dba->Abort(); + } +}; + +// Needed to correctly handle memgraph destruction from a signal handler. +// Without having some sort of a flag, it is possible that a signal is handled +// when we are exiting main, inside destructors of database::GraphDb and +// similar. The signal handler may then initiate another shutdown on memgraph +// which is in half destructed state, causing invalid memory access and crash. +volatile sig_atomic_t is_shutting_down = 0; + +void InitSignalHandlers(const std::function &shutdown_fun) { + // Prevent handling shutdown inside a shutdown. For example, SIGINT handler + // being interrupted by SIGTERM before is_shutting_down is set, thus causing + // double shutdown. + sigset_t block_shutdown_signals; + sigemptyset(&block_shutdown_signals); + sigaddset(&block_shutdown_signals, SIGTERM); + sigaddset(&block_shutdown_signals, SIGINT); + + // Wrap the shutdown function in a safe way to prevent recursive shutdown. + auto shutdown = [shutdown_fun]() { + if (is_shutting_down) return; + is_shutting_down = 1; + shutdown_fun(); + }; + + CHECK(utils::SignalHandler::RegisterHandler(utils::Signal::Terminate, + shutdown, block_shutdown_signals)) + << "Unable to register SIGTERM handler!"; + CHECK(utils::SignalHandler::RegisterHandler(utils::Signal::Interupt, shutdown, + block_shutdown_signals)) + << "Unable to register SIGINT handler!"; + + // Setup SIGUSR1 to be used for reopening log files, when e.g. logrotate + // rotates our logs. + CHECK(utils::SignalHandler::RegisterHandler(utils::Signal::User1, []() { + google::CloseLogDestination(google::INFO); + })) << "Unable to register SIGUSR1 handler!"; +} + +int WithInit(int argc, char **argv, + const std::function &get_stats_prefix, + const std::function &memgraph_main) { + gflags::SetVersionString(version_string); + + // Load config before parsing arguments, so that flags from the command line + // overwrite the config. + LoadConfig(); + gflags::ParseCommandLineFlags(&argc, &argv, true); + + google::InitGoogleLogging(argv[0]); + google::SetLogDestination(google::INFO, FLAGS_log_file.c_str()); + google::SetLogSymlink(google::INFO, FLAGS_log_link_basename.c_str()); + + // Unhandled exception handler init. + std::set_terminate(&utils::TerminateHandler); + + stats::InitStatsLogging(get_stats_prefix()); + utils::OnScopeExit stop_stats([] { stats::StopStatsLogging(); }); + + // Initialize the communication library. + communication::Init(); + + // Start memory warning logger. + utils::Scheduler mem_log_scheduler; + if (FLAGS_memory_warning_threshold > 0) { + auto free_ram = utils::sysinfo::AvailableMemoryKilobytes(); + if (free_ram) { + mem_log_scheduler.Run("Memory warning", std::chrono::seconds(3), [] { + auto free_ram = utils::sysinfo::AvailableMemoryKilobytes(); + if (free_ram && *free_ram / 1024 < FLAGS_memory_warning_threshold) + LOG(WARNING) << "Running out of available RAM, only " + << *free_ram / 1024 << " MB left."; + }); + } else { + // Kernel version for the `MemAvailable` value is from: man procfs + LOG(WARNING) << "You have an older kernel version (<3.14) or the /proc " + "filesystem isn't available so remaining memory warnings " + "won't be available."; + } + } + requests::Init(); + + memgraph_main(); + return 0; +} diff --git a/src/memgraph_init.hpp b/src/memgraph_init.hpp new file mode 100644 index 000000000..ffbec8a39 --- /dev/null +++ b/src/memgraph_init.hpp @@ -0,0 +1,115 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include + +#include "auth/auth.hpp" +#include "communication/bolt/v1/session.hpp" +#include "distributed/pull_rpc_clients.hpp" +#include "query/interpreter.hpp" +#include "query/transaction_engine.hpp" + +DECLARE_string(durability_directory); + +/// Encapsulates Dbms and Interpreter that are passed through the network server +/// and worker to the session. +struct SessionData { + database::GraphDb *db{nullptr}; + query::Interpreter *interpreter{nullptr}; + auth::Auth auth{ + std::experimental::filesystem::path(FLAGS_durability_directory) / "auth"}; +}; + +class BoltSession final + : public communication::bolt::Session { + public: + BoltSession(SessionData &data, communication::InputStream &input_stream, + communication::OutputStream &output_stream); + + using communication::bolt::Session::TEncoder; + + std::vector Interpret( + const std::string &query, + const std::map ¶ms) override; + + std::map PullAll( + TEncoder *encoder) override; + + void Abort() override; + + bool Authenticate(const std::string &username, + const std::string &password) override; + + private: + /// Wrapper around TEncoder which converts TypedValue to Value + /// before forwarding the calls to original TEncoder. + class TypedValueResultStream { + public: + TypedValueResultStream(TEncoder *encoder); + + void Result(const std::vector &values); + + private: + TEncoder *encoder_; + }; + + query::TransactionEngine transaction_engine_; + auth::Auth *auth_; + std::experimental::optional user_; +}; + +/// Class that implements ResultStream API for Kafka. +/// +/// Kafka doesn't need to stream the import results back to the client so we +/// don't need any functionality here. +class KafkaResultStream { + public: + void Result(const std::vector &) {} +}; + +/// Writes data streamed from kafka to memgraph. +void KafkaStreamWriter( + SessionData &session_data, const std::string &query, + const std::map ¶ms); + +/// Set up signal handlers and register `shutdown` on SIGTERM and SIGINT. +/// In most cases you don't have to call this. If you are using a custom server +/// startup function for `WithInit`, then you probably need to use this to +/// shutdown your server. +void InitSignalHandlers(const std::function &shutdown_fun); + +/// Run the Memgraph server. +/// +/// Sets up all the required state before running `memgraph_main` and does any +/// required cleanup afterwards. `get_stats_prefix` is used to obtain the +/// prefix when logging Memgraph's statistics. +/// +/// Command line arguments and configuration files are read before calling any +/// of the supplied functions. Therefore, you should use flags only from those +/// functions, and *not before* invoking `WithInit`. +/// +/// This should be the first and last thing a OS specific main function does. +/// +/// A common example of usage is: +/// +/// @code +/// int main(int argc, char *argv[]) { +/// auto get_stats_prefix = []() -> std::string { return "memgraph"; }; +/// return WithInit(argc, argv, get_stats_prefix, SingleNodeMain); +/// } +/// @endcode +/// +/// If you wish to start Memgraph server in another way, you can pass a +/// `memgraph_main` functions which does that. You should take care to call +/// `InitSignalHandlers` with appropriate function to shutdown the server you +/// started. +int WithInit(int argc, char **argv, + const std::function &get_stats_prefix, + const std::function &memgraph_main); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index aa8fe5214..82bfddb7e 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -27,3 +27,6 @@ add_subdirectory(property_based) # integration test binaries add_subdirectory(integration) + +# feature benchmark test binaries +add_subdirectory(feature_benchmark) diff --git a/tests/feature_benchmark/CMakeLists.txt b/tests/feature_benchmark/CMakeLists.txt new file mode 100644 index 000000000..ff0319854 --- /dev/null +++ b/tests/feature_benchmark/CMakeLists.txt @@ -0,0 +1,2 @@ +# kafka test binaries +add_subdirectory(kafka) diff --git a/tests/feature_benchmark/apollo_runs.yaml b/tests/feature_benchmark/apollo_runs.yaml new file mode 100644 index 000000000..4f3e92c8c --- /dev/null +++ b/tests/feature_benchmark/apollo_runs.yaml @@ -0,0 +1,10 @@ +- name: feature_benchmark__kafka + cd: kafka + commands: ./runner.sh + infiles: + - runner.sh # runner script + - transform.py # transform script + - generate.py # dataset generator script + - ../../../build_release/tests/feature_benchmark/kafka/kafka.py # kafka script + - ../../../build_release/tests/feature_benchmark/kafka/benchmark # benchmark binary + enable_network: true diff --git a/tests/feature_benchmark/kafka/CMakeLists.txt b/tests/feature_benchmark/kafka/CMakeLists.txt new file mode 100644 index 000000000..b79fd5f3e --- /dev/null +++ b/tests/feature_benchmark/kafka/CMakeLists.txt @@ -0,0 +1,9 @@ +set(target_name memgraph__feature_benchmark__kafka) + +set(benchmark_target_name ${target_name}__benchmark) +add_executable(${benchmark_target_name} benchmark.cpp) +set_target_properties(${benchmark_target_name} PROPERTIES OUTPUT_NAME benchmark) +target_link_libraries(${benchmark_target_name} memgraph_lib kvstore_lib) + +# Copy kafka.py to the feature integration kafka folder +configure_file(${PROJECT_SOURCE_DIR}/src/integrations/kafka/kafka.py ${CMAKE_CURRENT_BINARY_DIR} COPYONLY) diff --git a/tests/feature_benchmark/kafka/benchmark.cpp b/tests/feature_benchmark/kafka/benchmark.cpp new file mode 100644 index 000000000..50ac9c17d --- /dev/null +++ b/tests/feature_benchmark/kafka/benchmark.cpp @@ -0,0 +1,108 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include "json/json.hpp" + +#include "database/graph_db.hpp" +#include "integrations/kafka/streams.hpp" +#include "memgraph_init.hpp" +#include "utils/flag_validation.hpp" +#include "utils/timer.hpp" + +using namespace std::literals::chrono_literals; + +DEFINE_int64(import_count, 0, "How many entries should we import."); +DEFINE_int64(timeout, 60, "How many seconds should the benchmark wait."); +DEFINE_string(kafka_uri, "", "Kafka URI."); +DEFINE_string(topic_name, "", "Kafka topic."); +DEFINE_string(transform_uri, "", "Transform script URI."); +DEFINE_string(output_file, "", "Output file where shold the results be."); + +void KafkaBenchmarkMain() { + google::SetUsageMessage("Memgraph kafka benchmark database server"); + query::Interpreter interpreter; + database::SingleNode db; + SessionData session_data{&db, &interpreter}; + + std::atomic query_counter{0}; + std::atomic timeout_reached{false}; + std::atomic benchmark_finished{false}; + + integrations::kafka::Streams kafka_streams{ + std::experimental::filesystem::path(FLAGS_durability_directory) / + "streams", + [&session_data, &query_counter]( + const std::string &query, + const std::map ¶ms) { + KafkaStreamWriter(session_data, query, params); + query_counter++; + }}; + + session_data.interpreter->auth_ = &session_data.auth; + session_data.interpreter->kafka_streams_ = &kafka_streams; + + std::string stream_name = "benchmark"; + + integrations::kafka::StreamInfo stream_info; + stream_info.stream_name = stream_name; + stream_info.stream_uri = FLAGS_kafka_uri; + stream_info.stream_topic = FLAGS_topic_name; + stream_info.transform_uri = FLAGS_transform_uri; + + kafka_streams.Create(stream_info); + kafka_streams.StartAll(); + + // Kickoff a thread that will timeout after FLAGS_timeout seconds + std::thread timeout_thread_ = + std::thread([&timeout_reached, &benchmark_finished]() { + utils::ThreadSetName("BenchTimeout"); + for (int64_t i = 0; i < FLAGS_timeout; ++i) { + std::this_thread::sleep_for(1s); + if (benchmark_finished.load()) return; + } + + timeout_reached.store(true); + }); + + // Wait for the import to start + while (!timeout_reached.load() && query_counter.load() == 0) { + std::this_thread::sleep_for(1ms); + } + + int64_t query_count_start = query_counter.load(); + utils::Timer timer; + + // Wait for the import to finish + while (!timeout_reached.load() && query_counter.load() < FLAGS_import_count) { + std::this_thread::sleep_for(1ms); + } + + double duration = timer.Elapsed().count(); + kafka_streams.StopAll(); + kafka_streams.Drop(stream_name); + + benchmark_finished.store(true); + if (timeout_thread_.joinable()) timeout_thread_.join(); + + int64_t writes = query_counter.load() - query_count_start; + double write_per_second = writes / duration; + + std::ofstream output(FLAGS_output_file); + output << "duration " << duration << std::endl; + output << "executed_writes " << writes << std::endl; + output << "write_per_second " << write_per_second << std::endl; + output.close(); +} + +int main(int argc, char **argv) { + return WithInit(argc, argv, []() { return "memgraph"; }, KafkaBenchmarkMain); +} diff --git a/tests/feature_benchmark/kafka/generate.py b/tests/feature_benchmark/kafka/generate.py new file mode 100644 index 000000000..d70bb59e4 --- /dev/null +++ b/tests/feature_benchmark/kafka/generate.py @@ -0,0 +1,44 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +''' +Kafka benchmark dataset generator. +''' + + +import random +import sys +from argparse import ArgumentParser + + +def get_edge(): + from_node = random.randint(0, args.nodes) + to_node = random.randint(0, args.nodes) + + while from_node == to_node: + to_node = random.randint(0, args.nodes) + return (from_node, to_node) + + +def parse_args(): + argp = ArgumentParser(description=__doc__) + argp.add_argument("--nodes", type=int, default=100, help="Number of nodes.") + argp.add_argument("--edges", type=int, default=30, help="Number of edges.") + return argp.parse_args() + + +args = parse_args() +edges = set() + +for i in range(args.nodes): + print("%d\n" % i) + +for i in range(args.edges): + edge = get_edge() + while edge in edges: + edge = get_edge() + + edges.add(edge) + print("%d %d\n" % edge) + +sys.exit(0) diff --git a/tests/feature_benchmark/kafka/runner.sh b/tests/feature_benchmark/kafka/runner.sh new file mode 100755 index 000000000..6fb77ade2 --- /dev/null +++ b/tests/feature_benchmark/kafka/runner.sh @@ -0,0 +1,143 @@ +#!/bin/bash + +## Helper functions + +function wait_for_server { + port=$1 + while ! nc -z -w 1 127.0.0.1 $port; do + sleep 0.1 + done + sleep 1 +} + +function echo_info { printf "\033[1;36m~~ $1 ~~\033[0m\n"; } +function echo_success { printf "\033[1;32m~~ $1 ~~\033[0m\n\n"; } +function echo_failure { printf "\033[1;31m~~ $1 ~~\033[0m\n\n"; } + + +## Environment setup + +# Get script location. +DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +cd "$DIR" + +# Create a temporary directory. +tmpdir=/tmp/memgraph_benchmark_kafka +if [ -d $tmpdir ]; then + rm -rf $tmpdir +fi +mkdir -p $tmpdir +cd $tmpdir + +# Download the kafka binaries. +kafka="kafka_2.11-2.0.0" +wget -nv http://deps.memgraph.io/$kafka.tgz +tar -xf $kafka.tgz +mv $kafka kafka + +# Find memgraph binaries. +binary_dir="$DIR/../../../build" +if [ ! -d $binary_dir ]; then + binary_dir="$DIR/../../../build_release" +fi + +# Cleanup old kafka logs. +if [ -d /tmp/kafka-logs ]; then + rm -rf /tmp/kafka-logs +fi +if [ -d /tmp/zookeeper ]; then + rm -rf /tmp/zookeeper +fi + +# Results for apollo +RESULTS="$DIR/.apollo_measurements" + +# Benchmark parameters +NODES=100000 +EDGES=10000 + + +## Startup + +# Start the zookeeper process and wait for it to start. +echo_info "Starting zookeeper" +./kafka/bin/zookeeper-server-start.sh -daemon kafka/config/zookeeper.properties +wait_for_server 2181 +echo_success "Started zookeeper" + +# Start the kafka process and wait for it to start. +echo_info "Starting kafka" +./kafka/bin/kafka-server-start.sh -daemon kafka/config/server.properties +wait_for_server 9092 +echo_success "Started kafka" + +# Create the kafka topic. +echo_info "Creating kafka topic test" +./kafka/bin/kafka-topics.sh --create \ + --zookeeper 127.0.0.1:2181 \ + --replication-factor 1 \ + --partitions 1 \ + --topic test +echo_success "Created kafka topic test" + +# Start a http server to serve the transform script. +echo_info "Starting Python HTTP server" +mkdir serve +cd serve +cp "$DIR/transform.py" transform.py +python3 -m http.server & +http_server_pid=$! +wait_for_server 8000 +cd .. +echo_success "Started Python HTTP server" + + +# Start the memgraph process and wait for it to start. +echo_info "Starting kafka benchmark" +$binary_dir/tests/feature_benchmark/kafka/benchmark \ + --import-count=$(($NODES + $EDGES)) \ + --timeout=60 \ + --kafka-uri="127.0.0.1:9092" \ + --topic-name="test" \ + --transform-uri="127.0.0.1:8000/transform.py" \ + --output-file=$RESULTS & +pid=$! + +# Allow benchmark to initialize +sleep 5 + +echo_info "Generating kafka messages" +python3 "$DIR/generate.py" --nodes $NODES --edges $EDGES | \ + ./kafka/bin/kafka-console-producer.sh \ + --broker-list 127.0.0.1:9092 \ + --topic test \ + > /dev/null +echo_success "Finished generating kafka messages" + +wait -n $pid +code=$? + +if [ $code -eq 0 ]; then + echo_success "Benchmark finished successfully" +else + echo_failure "Benchmark didn't finish successfully" +fi + +## Cleanup + +echo_info "Starting test cleanup" + +# Shutdown the http server. +kill $http_server_pid +wait -n + +# Shutdown the kafka process. +./kafka/bin/kafka-server-stop.sh + +# Shutdown the zookeeper process. +./kafka/bin/zookeeper-server-stop.sh + +echo_success "Test cleanup done" + +[ $code -ne 0 ] && exit $code +exit 0 diff --git a/tests/feature_benchmark/kafka/transform.py b/tests/feature_benchmark/kafka/transform.py new file mode 100644 index 000000000..1fd87ca9b --- /dev/null +++ b/tests/feature_benchmark/kafka/transform.py @@ -0,0 +1,15 @@ +index_done = False + +def stream(batch): + global index_done + ret = [] + if not index_done: + ret.append(("CREATE INDEX ON :node(num)", {})) + index_done = True + for item in batch: + message = item.decode("utf-8").strip().split() + if len(message) == 1: + ret.append(("CREATE (:node {num: $num})", {"num": message[0]})) + elif len(message) == 2: + ret.append(("MATCH (n:node {num: $num1}), (m:node {num: $num2}) CREATE (n)-[:et]->(m)", {"num1": message[0], "num2": message[1]})) + return ret