From 42516afce8cb7133e489cbfc30189dbcf01514e2 Mon Sep 17 00:00:00 2001 From: Matej Ferencevic Date: Mon, 28 Oct 2019 11:11:00 +0100 Subject: [PATCH] Remove Kafka integration implementation and tests Reviewers: teon.banek Reviewed By: teon.banek Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D2525 --- libs/CMakeLists.txt | 16 - libs/setup.sh | 5 - src/CMakeLists.txt | 7 +- src/auth/models.cpp | 2 - src/auth/models.hpp | 3 +- src/glue/auth.cpp | 2 - src/integrations/CMakeLists.txt | 2 - src/integrations/kafka/CMakeLists.txt | 21 - src/integrations/kafka/consumer.cpp | 334 --------- src/integrations/kafka/consumer.hpp | 144 ---- src/integrations/kafka/exceptions.hpp | 124 ---- src/integrations/kafka/kafka.py | 162 ----- src/integrations/kafka/streams.cpp | 274 -------- src/integrations/kafka/streams.hpp | 127 ---- src/integrations/kafka/transform.cpp | 654 ------------------ src/integrations/kafka/transform.hpp | 65 -- src/memgraph.cpp | 18 - src/memgraph_init.cpp | 22 - src/memgraph_init.hpp | 14 - src/query/frontend/ast/ast.lcp | 58 +- src/query/frontend/ast/ast_visitor.hpp | 3 +- .../frontend/ast/cypher_main_visitor.cpp | 156 ----- .../frontend/ast/cypher_main_visitor.hpp | 63 -- .../opencypher/grammar/MemgraphCypher.g4 | 53 +- .../opencypher/grammar/MemgraphCypherLexer.g4 | 14 - .../frontend/semantic/required_privileges.cpp | 4 - .../frontend/stripped_lexer_constants.hpp | 5 +- src/query/interpreter.cpp | 179 ----- src/query/interpreter.hpp | 5 - tests/feature_benchmark/CMakeLists.txt | 3 - tests/feature_benchmark/apollo_runs.yaml | 11 - tests/feature_benchmark/kafka/CMakeLists.txt | 9 - tests/feature_benchmark/kafka/benchmark.cpp | 116 ---- tests/feature_benchmark/kafka/generate.py | 44 -- tests/feature_benchmark/kafka/runner.sh | 143 ---- tests/feature_benchmark/kafka/transform.py | 15 - tests/integration/CMakeLists.txt | 3 - tests/integration/apollo_runs.yaml | 11 - tests/integration/auth/runner.py | 35 - tests/integration/kafka/CMakeLists.txt | 6 - tests/integration/kafka/runner.sh | 163 ----- tests/integration/kafka/tester.cpp | 81 --- tests/integration/kafka/transform.py | 15 - tests/unit/cypher_main_visitor.cpp | 256 ------- tests/unit/query_common.hpp | 34 - tests/unit/query_required_privileges.cpp | 22 - 46 files changed, 10 insertions(+), 3493 deletions(-) delete mode 100644 src/integrations/CMakeLists.txt delete mode 100644 src/integrations/kafka/CMakeLists.txt delete mode 100644 src/integrations/kafka/consumer.cpp delete mode 100644 src/integrations/kafka/consumer.hpp delete mode 100644 src/integrations/kafka/exceptions.hpp delete mode 100644 src/integrations/kafka/kafka.py delete mode 100644 src/integrations/kafka/streams.cpp delete mode 100644 src/integrations/kafka/streams.hpp delete mode 100644 src/integrations/kafka/transform.cpp delete mode 100644 src/integrations/kafka/transform.hpp delete mode 100644 tests/feature_benchmark/kafka/CMakeLists.txt delete mode 100644 tests/feature_benchmark/kafka/benchmark.cpp delete mode 100644 tests/feature_benchmark/kafka/generate.py delete mode 100755 tests/feature_benchmark/kafka/runner.sh delete mode 100644 tests/feature_benchmark/kafka/transform.py delete mode 100644 tests/integration/kafka/CMakeLists.txt delete mode 100755 tests/integration/kafka/runner.sh delete mode 100644 tests/integration/kafka/tester.cpp delete mode 100644 tests/integration/kafka/transform.py diff --git a/libs/CMakeLists.txt b/libs/CMakeLists.txt index 5c21835d0..6507257aa 100644 --- a/libs/CMakeLists.txt +++ b/libs/CMakeLists.txt @@ -205,22 +205,6 @@ import_external_library(rocksdb STATIC -DCMAKE_SKIP_INSTALL_ALL_DEPENDENCY=true BUILD_COMMAND $(MAKE) rocksdb) -# Setup librdkafka. -import_external_library(librdkafka STATIC - ${CMAKE_CURRENT_SOURCE_DIR}/librdkafka/lib/librdkafka.a - ${CMAKE_CURRENT_SOURCE_DIR}/librdkafka/include/librdkafka - CMAKE_ARGS -DRDKAFKA_BUILD_STATIC=ON - -DRDKAFKA_BUILD_EXAMPLES=OFF - -DRDKAFKA_BUILD_TESTS=OFF - -DCMAKE_INSTALL_LIBDIR=lib - -DWITH_SSL=ON - # If we want SASL, we need to install it on build machines - -DWITH_SASL=OFF) - -import_library(librdkafka++ STATIC - ${CMAKE_CURRENT_SOURCE_DIR}/librdkafka/lib/librdkafka++.a - librdkafka-proj) - # Setup libbcrypt import_external_library(libbcrypt STATIC ${CMAKE_CURRENT_SOURCE_DIR}/libbcrypt/bcrypt.a diff --git a/libs/setup.sh b/libs/setup.sh index 4ff35dfd5..5d9163b9e 100755 --- a/libs/setup.sh +++ b/libs/setup.sh @@ -136,11 +136,6 @@ sed -i 's/-Wshadow/-Wno-defaulted-function-deleted/' rocksdb/CMakeLists.txt # remove shared library from install dependencies sed -i 's/TARGETS ${ROCKSDB_SHARED_LIB}/TARGETS ${ROCKSDB_SHARED_LIB} OPTIONAL/' rocksdb/CMakeLists.txt -# kafka -kafka_tag="c319b4e987d0bc4fe4f01cf91419d90b62061655" # Mar 8, 2018 -# git clone https://github.com/edenhill/librdkafka.git -clone git://deps.memgraph.io/librdkafka.git librdkafka $kafka_tag - # mgclient mgclient_tag="fe94b3631385ef5dbe40a3d8458860dbcc33e6ea" # May 27, 2019 # git clone https://github.com/memgraph/mgclient.git diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 1cdc54a0f..0f9c8ee09 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -4,7 +4,6 @@ add_subdirectory(lisp) add_subdirectory(utils) add_subdirectory(requests) -add_subdirectory(integrations) add_subdirectory(io) add_subdirectory(telemetry) add_subdirectory(communication) @@ -88,7 +87,7 @@ set(MG_SINGLE_NODE_LIBS stdc++fs Threads::Threads fmt cppitertools antlr_opencypher_parser_lib dl glog gflags mg-utils mg-io mg-requests mg-communication) # These are enterprise subsystems -set(MG_SINGLE_NODE_LIBS ${MG_SINGLE_NODE_LIBS} mg-integrations-kafka mg-auth) +set(MG_SINGLE_NODE_LIBS ${MG_SINGLE_NODE_LIBS} mg-auth) if (USE_LTALLOC) list(APPEND MG_SINGLE_NODE_LIBS ltalloc) @@ -146,7 +145,7 @@ set(MG_SINGLE_NODE_V2_LIBS stdc++fs Threads::Threads fmt cppitertools antlr_opencypher_parser_lib dl glog gflags mg-storage-v2 mg-utils mg-io mg-requests mg-communication) # These are enterprise subsystems -set(MG_SINGLE_NODE_V2_LIBS ${MG_SINGLE_NODE_V2_LIBS} mg-integrations-kafka mg-auth) +set(MG_SINGLE_NODE_V2_LIBS ${MG_SINGLE_NODE_V2_LIBS} mg-auth) if (USE_LTALLOC) list(APPEND MG_SINGLE_NODE_V2_LIBS ltalloc) @@ -230,7 +229,7 @@ add_custom_target(generate_lcp_single_node_ha DEPENDS generate_lcp_common ${gene set(MG_SINGLE_NODE_HA_LIBS stdc++fs Threads::Threads fmt cppitertools antlr_opencypher_parser_lib dl glog gflags - mg-utils mg-io mg-integrations-kafka mg-requests mg-communication mg-comm-rpc + mg-utils mg-io mg-requests mg-communication mg-comm-rpc mg-auth) if (USE_LTALLOC) diff --git a/src/auth/models.cpp b/src/auth/models.cpp index 9f816304d..a15fb703f 100644 --- a/src/auth/models.cpp +++ b/src/auth/models.cpp @@ -42,8 +42,6 @@ std::string PermissionToString(Permission permission) { return "DUMP"; case Permission::AUTH: return "AUTH"; - case Permission::STREAM: - return "STREAM"; } } diff --git a/src/auth/models.hpp b/src/auth/models.hpp index b8e43a69d..3475a8013 100644 --- a/src/auth/models.hpp +++ b/src/auth/models.hpp @@ -21,7 +21,6 @@ enum class Permission : uint64_t { CONSTRAINT = 0x00000100, DUMP = 0x00000200, AUTH = 0x00010000, - STREAM = 0x00020000, }; // Constant list of all available permissions. @@ -29,7 +28,7 @@ const std::vector kPermissionsAll = { Permission::MATCH, Permission::CREATE, Permission::MERGE, Permission::DELETE, Permission::SET, Permission::REMOVE, Permission::INDEX, Permission::STATS, Permission::CONSTRAINT, - Permission::DUMP, Permission::AUTH, Permission::STREAM}; + Permission::DUMP, Permission::AUTH}; // Function that converts a permission to its string representation. std::string PermissionToString(Permission permission); diff --git a/src/glue/auth.cpp b/src/glue/auth.cpp index 8984a8c0c..52e7eed13 100644 --- a/src/glue/auth.cpp +++ b/src/glue/auth.cpp @@ -26,8 +26,6 @@ auth::Permission PrivilegeToPermission(query::AuthQuery::Privilege privilege) { return auth::Permission::DUMP; case query::AuthQuery::Privilege::AUTH: return auth::Permission::AUTH; - case query::AuthQuery::Privilege::STREAM: - return auth::Permission::STREAM; } } } diff --git a/src/integrations/CMakeLists.txt b/src/integrations/CMakeLists.txt deleted file mode 100644 index 44a7e0b80..000000000 --- a/src/integrations/CMakeLists.txt +++ /dev/null @@ -1,2 +0,0 @@ -# kafka integration -add_subdirectory(kafka) diff --git a/src/integrations/kafka/CMakeLists.txt b/src/integrations/kafka/CMakeLists.txt deleted file mode 100644 index c48ac0f9d..000000000 --- a/src/integrations/kafka/CMakeLists.txt +++ /dev/null @@ -1,21 +0,0 @@ -set(integrations_kafka_src_files - consumer.cpp - transform.cpp - streams.cpp) - -find_package(Seccomp) -if (NOT SECCOMP_FOUND) - message(FATAL_ERROR "Couldn't find seccomp library!") -endif() - -add_library(mg-integrations-kafka STATIC ${integrations_kafka_src_files}) -target_link_libraries(mg-integrations-kafka stdc++fs Threads::Threads fmt - glog gflags librdkafka++ librdkafka zlib json) -target_link_libraries(mg-integrations-kafka mg-utils - mg-requests mg-communication) - -target_link_libraries(mg-integrations-kafka ${Seccomp_LIBRARIES}) -target_include_directories(mg-integrations-kafka SYSTEM PUBLIC ${Seccomp_INCLUDE_DIRS}) - -# Copy kafka.py to the root of our build directory where memgraph executable should be -configure_file(kafka.py ${CMAKE_BINARY_DIR} COPYONLY) diff --git a/src/integrations/kafka/consumer.cpp b/src/integrations/kafka/consumer.cpp deleted file mode 100644 index 8df9aca72..000000000 --- a/src/integrations/kafka/consumer.cpp +++ /dev/null @@ -1,334 +0,0 @@ -#include "integrations/kafka/consumer.hpp" - -#include -#include - -#include "glog/logging.h" - -#include "integrations/kafka/exceptions.hpp" -#include "utils/on_scope_exit.hpp" -#include "utils/thread.hpp" - -namespace integrations::kafka { - -using namespace std::chrono_literals; - -constexpr int64_t kDefaultBatchIntervalMillis = 100; -constexpr int64_t kDefaultBatchSize = 1000; -constexpr int64_t kDefaultTestBatchLimit = 1; - -void Consumer::event_cb(RdKafka::Event &event) { - switch (event.type()) { - case RdKafka::Event::Type::EVENT_ERROR: - LOG(WARNING) << "[Kafka] stream " << info_.stream_name << " ERROR (" - << RdKafka::err2str(event.err()) << "): " << event.str(); - break; - default: - break; - } -} - -Consumer::Consumer( - const StreamInfo &info, const std::string &transform_script_path, - std::function< - void(const std::string &, - const std::map &)> - stream_writer) - : info_(info), - transform_script_path_(transform_script_path), - stream_writer_(stream_writer) { - std::unique_ptr conf( - RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL)); - std::string error; - - if (conf->set("event_cb", this, error) != RdKafka::Conf::CONF_OK) { - throw ConsumerFailedToInitializeException(info_.stream_name, error); - } - - if (conf->set("enable.partition.eof", "false", error) != - RdKafka::Conf::CONF_OK) { - throw ConsumerFailedToInitializeException(info_.stream_name, error); - } - - if (conf->set("bootstrap.servers", info_.stream_uri, error) != - RdKafka::Conf::CONF_OK) { - throw ConsumerFailedToInitializeException(info_.stream_name, error); - } - - if (conf->set("group.id", "mg", error) != RdKafka::Conf::CONF_OK) { - throw ConsumerFailedToInitializeException(info_.stream_name, error); - } - - consumer_ = std::unique_ptr>( - RdKafka::KafkaConsumer::create(conf.get(), error), - [this](auto *consumer) { - this->StopConsuming(); - consumer->close(); - delete consumer; - }); - - if (!consumer_) { - throw ConsumerFailedToInitializeException(info_.stream_name, error); - } - - // Try fetching metadata first and check if topic exists. - RdKafka::ErrorCode err; - RdKafka::Metadata *raw_metadata = nullptr; - err = consumer_->metadata(true, nullptr, &raw_metadata, 1000); - std::unique_ptr metadata(raw_metadata); - if (err != RdKafka::ERR_NO_ERROR) { - throw ConsumerFailedToInitializeException(info_.stream_name, - RdKafka::err2str(err)); - } - - bool topic_found = false; - for (const auto &topic_metadata : *metadata->topics()) { - if (topic_metadata->topic() == info_.stream_topic) { - topic_found = true; - break; - } - } - - if (!topic_found) { - throw TopicNotFoundException(info_.stream_name); - } - - err = consumer_->subscribe({info_.stream_topic}); - if (err != RdKafka::ERR_NO_ERROR) { - throw ConsumerFailedToInitializeException(info_.stream_name, - RdKafka::err2str(err)); - } -} - -void Consumer::StopConsuming() { - is_running_.store(false); - if (thread_.joinable()) thread_.join(); - - // Set limit_batches to nullopt since it's not running anymore. - info_.limit_batches = std::nullopt; -} - -void Consumer::StartConsuming(std::optional limit_batches) { - info_.limit_batches = limit_batches; - is_running_.store(true); - - thread_ = std::thread([this, limit_batches]() { - utils::ThreadSetName("StreamKafka"); - - int64_t batch_count = 0; - Transform transform(transform_script_path_); - - transform_alive_.store(false); - if (!transform.Start()) { - LOG(WARNING) << "[Kafka] stream " << info_.stream_name - << " couldn't start the transform script!"; - return; - } - transform_alive_.store(true); - - while (is_running_) { - auto batch = this->GetBatch(); - - if (batch.empty()) continue; - - DLOG(INFO) << "[Kafka] stream " << info_.stream_name - << " processing a batch"; - - // All exceptions that could be possibly thrown by the `Apply` function - // must be handled here because they *will* crash the database if - // uncaught! - // TODO (mferencevic): Figure out what to do with all other exceptions. - try { - transform.Apply(batch, stream_writer_); - } catch (const TransformExecutionException &) { - LOG(WARNING) << "[Kafka] stream " << info_.stream_name - << " the transform process has died!"; - break; - } catch (const utils::BasicException &e) { - LOG(WARNING) << "[Kafka] stream " << info_.stream_name - << " the transform process received an exception: " - << e.what(); - break; - } - - if (limit_batches != std::nullopt) { - if (limit_batches <= ++batch_count) { - is_running_.store(false); - break; - } - } - } - - transform_alive_.store(false); - }); -} - -std::vector> Consumer::GetBatch() { - std::vector> batch; - auto start = std::chrono::system_clock::now(); - int64_t remaining_timeout_in_ms = - info_.batch_interval_in_ms.value_or(kDefaultBatchIntervalMillis); - int64_t batch_size = info_.batch_size.value_or(kDefaultBatchSize); - - batch.reserve(batch_size); - - bool run_batch = true; - for (int64_t i = 0; remaining_timeout_in_ms > 0 && i < batch_size; ++i) { - std::unique_ptr msg( - consumer_->consume(remaining_timeout_in_ms)); - switch (msg->err()) { - case RdKafka::ERR__TIMED_OUT: - run_batch = false; - break; - - case RdKafka::ERR_NO_ERROR: - batch.emplace_back(std::move(msg)); - break; - - default: - LOG(WARNING) << "[Kafka] stream " << info_.stream_name - << " consumer error: " << msg->errstr(); - run_batch = false; - is_running_.store(false); - break; - } - - if (!run_batch) { - break; - } - - auto now = std::chrono::system_clock::now(); - auto took = - std::chrono::duration_cast(now - start); - remaining_timeout_in_ms = remaining_timeout_in_ms - took.count(); - start = now; - } - - return batch; -} - -void Consumer::Start(std::optional limit_batches) { - if (!consumer_) { - throw ConsumerNotAvailableException(info_.stream_name); - } - - if (is_running_) { - throw ConsumerRunningException(info_.stream_name); - } - - StartConsuming(limit_batches); -} - -void Consumer::Stop() { - if (!consumer_) { - throw ConsumerNotAvailableException(info_.stream_name); - } - - if (!is_running_) { - throw ConsumerStoppedException(info_.stream_name); - } - - StopConsuming(); -} - -void Consumer::StartIfStopped() { - if (!consumer_) { - throw ConsumerNotAvailableException(info_.stream_name); - } - - if (!is_running_) { - StartConsuming(std::nullopt); - } -} - -void Consumer::StopIfRunning() { - if (!consumer_) { - throw ConsumerNotAvailableException(info_.stream_name); - } - - if (is_running_) { - StopConsuming(); - } -} - -std::vector< - std::pair>> -Consumer::Test(std::optional limit_batches) { - // All exceptions thrown here are handled by the Bolt protocol. - if (!consumer_) { - throw ConsumerNotAvailableException(info_.stream_name); - } - - if (is_running_) { - throw ConsumerRunningException(info_.stream_name); - } - - Transform transform(transform_script_path_); - - int64_t num_of_batches = limit_batches.value_or(kDefaultTestBatchLimit); - std::vector< - std::pair>> - results; - - is_running_.store(true); - - utils::OnScopeExit cleanup([this]() { is_running_.store(false); }); - - transform_alive_.store(false); - if (!transform.Start()) { - LOG(WARNING) << "[Kafka] stream " << info_.stream_name - << " couldn't start the transform script!"; - throw TransformExecutionException("Couldn't start the transform script!"); - } - transform_alive_.store(true); - - for (int64_t i = 0; i < num_of_batches; ++i) { - auto batch = GetBatch(); - - // Exceptions thrown by `Apply` are handled in Bolt. - // Wrap the `TransformExecutionException` into a new exception with a - // message that isn't so specific so the user doesn't get confused. - try { - transform.Apply( - batch, - [&results]( - const std::string &query, - const std::map ¶ms) { - results.push_back({query, params}); - }); - } catch (const TransformExecutionException) { - LOG(WARNING) << "[Kafka] stream " << info_.stream_name - << " the transform process has died!"; - throw TransformExecutionException( - "The transform script contains a runtime error!"); - } - } - - transform_alive_.store(false); - - return results; -} - -StreamStatus Consumer::Status() { - StreamStatus ret; - ret.stream_name = info_.stream_name; - ret.stream_uri = info_.stream_uri; - ret.stream_topic = info_.stream_topic; - ret.transform_uri = info_.transform_uri; - if (!is_running_) { - ret.stream_status = "stopped"; - } else if (!transform_alive_) { - ret.stream_status = "error"; - } else { - ret.stream_status = "running"; - } - return ret; -} - -StreamInfo Consumer::Info() { - info_.is_running = is_running_; - return info_; -} - -} // namespace integrations::kafka diff --git a/src/integrations/kafka/consumer.hpp b/src/integrations/kafka/consumer.hpp deleted file mode 100644 index 8e83f8abe..000000000 --- a/src/integrations/kafka/consumer.hpp +++ /dev/null @@ -1,144 +0,0 @@ -/// @file -#pragma once - -#include -#include -#include -#include -#include -#include -#include - -#include "rdkafkacpp.h" - -#include "communication/bolt/v1/value.hpp" -#include "integrations/kafka/transform.hpp" - -namespace integrations { -namespace kafka { - -/// StreamInfo holds all important info about a stream for memgraph. -/// -/// The fields inside this struct are used for serialization and -/// deserialization. -struct StreamInfo { - std::string stream_name; - std::string stream_uri; - std::string stream_topic; - std::string transform_uri; - std::optional batch_interval_in_ms; - std::optional batch_size; - - std::optional limit_batches; - - bool is_running = false; -}; - -/// StreamStatus holds all important info about a stream for a user. -struct StreamStatus { - std::string stream_name; - std::string stream_uri; - std::string stream_topic; - std::string transform_uri; - std::string stream_status; -}; - -/// Memgraphs kafka consumer wrapper. -/// -/// Class Consumer wraps around librdkafka Consumer so it's easier to use it. -/// It extends RdKafka::EventCb in order to listen to error events. -class Consumer final : public RdKafka::EventCb { - public: - Consumer() = delete; - - /// Creates a new consumer with the given parameters. - /// - /// @param info necessary info about a stream - /// @param script_path path on the filesystem where the transform script - /// is stored - /// @param stream_writer custom lambda that knows how to write data to the - /// db - // - /// @throws ConsumerFailedToInitializeException if the consumer can't connect - /// to the Kafka endpoint. - Consumer(const StreamInfo &info, const std::string &transform_script_path, - std::function< - void(const std::string &, - const std::map &)> - stream_writer); - - Consumer(const Consumer &other) = delete; - Consumer(Consumer &&other) = delete; - - Consumer &operator=(const Consumer &other) = delete; - Consumer &operator=(Consumer &&other) = delete; - - /// Starts importing data from a stream to the db. - /// This method will start a new thread which does the import. - /// - /// @param limit_batches if present, the consumer will only import the given - /// number of batches in the db, and stop afterwards. - /// - /// @throws ConsumerNotAvailableException if the consumer isn't initialized - /// @throws ConsumerRunningException if the consumer is already running - void Start(std::optional limit_batches); - - /// Stops importing data from a stream to the db. - /// - /// @throws ConsumerNotAvailableException if the consumer isn't initialized - /// @throws ConsumerStoppedException if the consumer is already stopped - void Stop(); - - /// Starts importing importing from a stream only if the stream is stopped. - /// - /// @throws ConsumerNotAvailableException if the consumer isn't initialized - void StartIfStopped(); - - /// Stops importing from a stream only if the stream is running. - /// - /// @throws ConsumerNotAvailableException if the consumer isn't initialized - void StopIfRunning(); - - /// Performs a dry-run on a given stream. - /// - /// @param limit_batches the consumer will only test on the given number of - /// batches. If not present, a default value is used. - /// - /// @throws ConsumerNotAvailableException if the consumer isn't initialized - /// @throws ConsumerRunningException if the consumer is alredy running. - std::vector< - std::pair>> - Test(std::optional limit_batches); - - /// Returns the current status of a stream. - StreamStatus Status(); - - /// Returns the info of a stream. - StreamInfo Info(); - - private: - StreamInfo info_; - std::string transform_script_path_; - std::function &)> - stream_writer_; - - std::atomic is_running_{false}; - std::atomic transform_alive_{false}; - std::thread thread_; - - std::unique_ptr> - consumer_; - - void event_cb(RdKafka::Event &event) override; - - void StopConsuming(); - - void StartConsuming(std::optional limit_batches); - - std::vector> GetBatch(); -}; - -} // namespace kafka -} // namespace integrations diff --git a/src/integrations/kafka/exceptions.hpp b/src/integrations/kafka/exceptions.hpp deleted file mode 100644 index baeae47ba..000000000 --- a/src/integrations/kafka/exceptions.hpp +++ /dev/null @@ -1,124 +0,0 @@ -#pragma once - -#include "utils/exceptions.hpp" - -#include - -namespace integrations::kafka { - -class KafkaStreamException : public utils::BasicException { - using utils::BasicException::BasicException; -}; - -class StreamExistsException : public KafkaStreamException { - public: - explicit StreamExistsException(const std::string &stream_name) - : KafkaStreamException( - fmt::format("Kafka stream {} already exists.", stream_name)) {} -}; - -class StreamDoesntExistException : public KafkaStreamException { - public: - explicit StreamDoesntExistException(const std::string &stream_name) - : KafkaStreamException( - fmt::format("Kafka stream {} doesn't exist.", stream_name)) {} -}; - -class StreamSerializationException : public KafkaStreamException { - public: - StreamSerializationException() - : KafkaStreamException("Failed to serialize stream data!") {} -}; - -class StreamDeserializationException : public KafkaStreamException { - public: - StreamDeserializationException() - : KafkaStreamException("Failed to deserialize stream data!") {} -}; - -class StreamMetadataCouldNotBeStored : public KafkaStreamException { - public: - explicit StreamMetadataCouldNotBeStored(const std::string &stream_name) - : KafkaStreamException(fmt::format( - "Couldn't persist stream metadata for stream {}", stream_name)) {} -}; - -class StreamMetadataCouldNotBeDeleted : public KafkaStreamException { - public: - explicit StreamMetadataCouldNotBeDeleted(const std::string &stream_name) - : KafkaStreamException(fmt::format( - "Couldn't delete persisted stream metadata for stream {}", - stream_name)) {} -}; - -class ConsumerFailedToInitializeException : public KafkaStreamException { - public: - ConsumerFailedToInitializeException(const std::string &stream_name, - const std::string &error) - : KafkaStreamException(fmt::format( - "Failed to initialize kafka stream {} : {}", stream_name, error)) {} -}; - -class ConsumerNotAvailableException : public KafkaStreamException { - public: - explicit ConsumerNotAvailableException(const std::string &stream_name) - : KafkaStreamException( - fmt::format("Kafka stream {} not available", stream_name)) {} -}; - -class ConsumerRunningException : public KafkaStreamException { - public: - explicit ConsumerRunningException(const std::string &stream_name) - : KafkaStreamException( - fmt::format("Kafka stream {} is already running", stream_name)) {} -}; - -class ConsumerStoppedException : public KafkaStreamException { - public: - explicit ConsumerStoppedException(const std::string &stream_name) - : KafkaStreamException( - fmt::format("Kafka stream {} is already stopped", stream_name)) {} -}; - -class TopicNotFoundException : public KafkaStreamException { - public: - explicit TopicNotFoundException(const std::string &stream_name) - : KafkaStreamException( - fmt::format("Kafka stream {}, topic not found", stream_name)) {} -}; - -class TransformScriptNotFoundException : public KafkaStreamException { - public: - explicit TransformScriptNotFoundException(const std::string &stream_name) - : KafkaStreamException(fmt::format( - "Couldn't find transform script for {}", stream_name)) {} -}; - -class TransformScriptDownloadException : public KafkaStreamException { - public: - explicit TransformScriptDownloadException(const std::string &transform_uri) - : KafkaStreamException(fmt::format( - "Couldn't get the transform script from {}", transform_uri)) {} -}; - -class TransformScriptCouldNotBeCreatedException : public KafkaStreamException { - public: - explicit TransformScriptCouldNotBeCreatedException( - const std::string &stream_name) - : KafkaStreamException(fmt::format( - "Couldn't create transform script for stream {}", stream_name)) {} -}; - -class TransformScriptCouldNotBeDeletedException : public KafkaStreamException { - public: - explicit TransformScriptCouldNotBeDeletedException( - const std::string &stream_name) - : KafkaStreamException(fmt::format( - "Couldn't delete transform script for stream {}", stream_name)) {} -}; - -class TransformExecutionException : public KafkaStreamException { - using KafkaStreamException::KafkaStreamException; -}; - -} // namespace integrations::kafka diff --git a/src/integrations/kafka/kafka.py b/src/integrations/kafka/kafka.py deleted file mode 100644 index 749ea8976..000000000 --- a/src/integrations/kafka/kafka.py +++ /dev/null @@ -1,162 +0,0 @@ -#!/usr/bin/python3 - -import os -import struct - -# Import the target transform script. - -import transform - - -# Constants used for communicating with the memgraph process. - -COMMUNICATION_TO_PYTHON_FD = 1000 -COMMUNICATION_FROM_PYTHON_FD = 1002 - - -# Functions used to get data from the memgraph process. - -def get_data(num_bytes): - data = bytes() - while len(data) < num_bytes: - data += os.read(COMMUNICATION_TO_PYTHON_FD, num_bytes - len(data)) - return data - - -def get_size(): - fmt = "I" # uint32_t - data = get_data(struct.calcsize(fmt)) - return struct.unpack(fmt, data)[0] - - -def get_batch(): - batch = [] - count = get_size() - for i in range(count): - size = get_size() - batch.append(get_data(size)) - return batch - - -# Functions used to put data to the memgraph process. - -TYPE_NONE = 0x10 -TYPE_BOOL_FALSE = 0x20 -TYPE_BOOL_TRUE = 0x21 -TYPE_INT = 0x30 -TYPE_FLOAT = 0x40 -TYPE_STR = 0x50 -TYPE_LIST = 0x60 -TYPE_DICT = 0x70 - - -def put_data(data): - written = 0 - while written < len(data): - written += os.write(COMMUNICATION_FROM_PYTHON_FD, data[written:]) - - -def put_size(size): - fmt = "I" # uint32_t - put_data(struct.pack(fmt, size)) - - -def put_type(typ): - fmt = "B" # uint8_t - put_data(struct.pack(fmt, typ)) - - -def put_string(value): - data = value.encode("utf-8") - put_size(len(data)) - put_data(data) - - -def put_value(value, ids): - if value is None: - put_type(TYPE_NONE) - elif type(value) is bool: - if value: - put_type(TYPE_BOOL_TRUE) - else: - put_type(TYPE_BOOL_FALSE) - elif type(value) is int: - put_type(TYPE_INT) - put_data(struct.pack("q", value)) # int64_t - elif type(value) is float: - put_type(TYPE_FLOAT) - put_data(struct.pack("d", value)) # double - elif type(value) is str: - put_type(TYPE_STR) - put_string(value) - elif type(value) is list: - if id(value) in ids: - raise ValueError("Recursive objects are not supported!") - ids_new = ids + [id(value)] - put_type(TYPE_LIST) - put_size(len(value)) - for item in value: - put_value(item, ids_new) - elif type(value) is dict: - if id(value) in ids: - raise ValueError("Recursive objects are not supported!") - ids_new = ids + [id(value)] - put_type(TYPE_DICT) - put_size(len(value)) - for key, item in value.items(): - if type(key) is not str: - raise TypeError("Dictionary keys must be strings!") - put_string(key) - put_value(item, ids_new) - else: - raise TypeError("Unsupported value type {}!".format(str(type(value)))) - - -# Functions used to continuously process data. - -def put_params(params): - if type(params) != dict: - raise TypeError("Parameters must be a dict!") - put_value(params, []) - - -class StreamError(Exception): - pass - - -def process_batch(): - # Get the data that should be transformed. - batch = get_batch() - - # Transform the data. - ret = transform.stream(batch) - - # Sanity checks for the transformed data. - if type(ret) != list: - raise StreamError("The transformed items must be a list!") - for item in ret: - if type(item) not in [list, tuple]: - raise StreamError("The transformed item must be a tuple " - "or a list!") - if len(item) != 2: - raise StreamError("There must be exactly two elements in the " - "transformed item!") - if type(item[0]) != str: - raise StreamError("The first transformed element of an item " - "must be a string!") - if type(item[1]) != dict: - raise StreamError("The second transformed element of an item " - "must be a dictionary!") - - # Send the items to the server. - put_size(len(ret)) - for query, params in ret: - put_string(query) - put_params(params) - - -# Main entry point. - -if __name__ == "__main__": - while True: - process_batch() diff --git a/src/integrations/kafka/streams.cpp b/src/integrations/kafka/streams.cpp deleted file mode 100644 index 54ad2070c..000000000 --- a/src/integrations/kafka/streams.cpp +++ /dev/null @@ -1,274 +0,0 @@ -#include "integrations/kafka/streams.hpp" - -#include -#include -#include - -#include - -#include "integrations/kafka/exceptions.hpp" -#include "requests/requests.hpp" -#include "utils/file.hpp" - -namespace integrations::kafka { - -namespace fs = std::filesystem; - -const std::string kMetadataDir = "metadata"; -const std::string kTransformDir = "transform"; -const std::string kTransformExt = ".py"; - -namespace { - -nlohmann::json Serialize(const StreamInfo &info) { - nlohmann::json data = nlohmann::json::object(); - data["stream_name"] = info.stream_name; - data["stream_uri"] = info.stream_uri; - data["stream_topic"] = info.stream_topic; - data["transform_uri"] = info.transform_uri; - - if (info.batch_interval_in_ms) { - data["batch_interval_in_ms"] = info.batch_interval_in_ms.value(); - } else { - data["batch_interval_in_ms"] = nullptr; - } - - if (info.batch_size) { - data["batch_size"] = info.batch_size.value(); - } else { - data["batch_size"] = nullptr; - } - - if (info.limit_batches) { - data["limit_batches"] = info.limit_batches.value(); - } else { - data["limit_batches"] = nullptr; - } - - data["is_running"] = info.is_running; - - return data; -} - -StreamInfo Deserialize(const nlohmann::json &data) { - if (!data.is_object()) throw StreamDeserializationException(); - - StreamInfo info; - - if (!data["stream_name"].is_string()) throw StreamDeserializationException(); - info.stream_name = data["stream_name"]; - - if (!data["stream_uri"].is_string()) throw StreamDeserializationException(); - info.stream_uri = data["stream_uri"]; - - if (!data["stream_topic"].is_string()) throw StreamDeserializationException(); - info.stream_topic = data["stream_topic"]; - - if (!data["transform_uri"].is_string()) - throw StreamDeserializationException(); - info.transform_uri = data["transform_uri"]; - - if (data["batch_interval_in_ms"].is_number()) { - info.batch_interval_in_ms = data["batch_interval_in_ms"]; - } else if (data["batch_interval_in_ms"].is_null()) { - info.batch_interval_in_ms = std::nullopt; - } else { - throw StreamDeserializationException(); - } - - if (data["batch_size"].is_number()) { - info.batch_size = data["batch_size"]; - } else if (data["batch_size"].is_null()) { - info.batch_size = std::nullopt; - } else { - throw StreamDeserializationException(); - } - - if (!data["is_running"].is_boolean()) throw StreamDeserializationException(); - info.is_running = data["is_running"]; - - if (data["limit_batches"].is_number()) { - info.limit_batches = data["limit_batches"]; - } else if (data["limit_batches"].is_null()) { - info.limit_batches = std::nullopt; - } else { - throw StreamDeserializationException(); - } - - return info; -} - -} // namespace - -Streams::Streams(const std::string &streams_directory, - std::function &)> - stream_writer) - : streams_directory_(streams_directory), - stream_writer_(stream_writer), - metadata_store_(fs::path(streams_directory) / kMetadataDir) {} - -void Streams::Recover() { - for (auto it = metadata_store_.begin(); it != metadata_store_.end(); ++it) { - // Check if the transform script also exists; - auto transform_script = GetTransformScriptPath(it->first); - if (!fs::exists(transform_script)) - throw TransformScriptNotFoundException(it->first); - - nlohmann::json data; - try { - data = nlohmann::json::parse(it->second); - } catch (const nlohmann::json::parse_error &e) { - throw StreamDeserializationException(); - } - - StreamInfo info = Deserialize(data); - Create(info, false); - if (info.is_running) Start(info.stream_name, info.limit_batches); - } -} - -void Streams::Create(const StreamInfo &info, bool download_transform_script) { - std::lock_guard g(mutex_); - if (consumers_.find(info.stream_name) != consumers_.end()) - throw StreamExistsException(info.stream_name); - - // Make sure transform directory exists or we can create it. - if (!utils::EnsureDir(GetTransformScriptDir())) { - throw TransformScriptCouldNotBeCreatedException(info.stream_name); - } - - // Download the transform script. - auto transform_script_path = GetTransformScriptPath(info.stream_name); - if (download_transform_script && - !requests::CreateAndDownloadFile(info.transform_uri, - transform_script_path)) { - throw TransformScriptDownloadException(info.transform_uri); - } - - // Store stream_info in metadata_store_. - if (!metadata_store_.Put(info.stream_name, Serialize(info).dump())) { - throw StreamMetadataCouldNotBeStored(info.stream_name); - } - - try { - consumers_.emplace( - std::piecewise_construct, std::forward_as_tuple(info.stream_name), - std::forward_as_tuple(info, transform_script_path, stream_writer_)); - } catch (const KafkaStreamException &e) { - // If we failed to create the consumer, remove the persisted metadata. - metadata_store_.Delete(info.stream_name); - // Rethrow the exception. - throw; - } -} - -void Streams::Drop(const std::string &stream_name) { - std::lock_guard g(mutex_); - auto find_it = consumers_.find(stream_name); - if (find_it == consumers_.end()) - throw StreamDoesntExistException(stream_name); - - // Erase and implicitly stop the consumer. - consumers_.erase(find_it); - - // Remove stream_info in metadata_store_. - if (!metadata_store_.Delete(stream_name)) { - throw StreamMetadataCouldNotBeDeleted(stream_name); - } - - // Remove transform script. - if (std::remove(GetTransformScriptPath(stream_name).c_str())) { - throw TransformScriptNotFoundException(stream_name); - } -} - -void Streams::Start(const std::string &stream_name, - std::optional limit_batches) { - std::lock_guard g(mutex_); - auto find_it = consumers_.find(stream_name); - if (find_it == consumers_.end()) - throw StreamDoesntExistException(stream_name); - - find_it->second.Start(limit_batches); - - // Store stream_info in metadata_store_. - if (!metadata_store_.Put(stream_name, - Serialize(find_it->second.Info()).dump())) { - throw StreamMetadataCouldNotBeStored(stream_name); - } -} - -void Streams::Stop(const std::string &stream_name) { - std::lock_guard g(mutex_); - auto find_it = consumers_.find(stream_name); - if (find_it == consumers_.end()) - throw StreamDoesntExistException(stream_name); - - find_it->second.Stop(); - - // Store stream_info in metadata_store_. - if (!metadata_store_.Put(stream_name, - Serialize(find_it->second.Info()).dump())) { - throw StreamMetadataCouldNotBeStored(stream_name); - } -} - -void Streams::StartAll() { - std::lock_guard g(mutex_); - for (auto &consumer_kv : consumers_) { - consumer_kv.second.StartIfStopped(); - - // Store stream_info in metadata_store_. - if (!metadata_store_.Put(consumer_kv.first, - Serialize(consumer_kv.second.Info()).dump())) { - throw StreamMetadataCouldNotBeStored(consumer_kv.first); - } - } -} - -void Streams::StopAll() { - std::lock_guard g(mutex_); - for (auto &consumer_kv : consumers_) { - consumer_kv.second.StopIfRunning(); - - // Store stream_info in metadata_store_. - if (!metadata_store_.Put(consumer_kv.first, - Serialize(consumer_kv.second.Info()).dump())) { - throw StreamMetadataCouldNotBeStored(consumer_kv.first); - } - } -} - -std::vector Streams::Show() { - std::vector streams; - std::lock_guard g(mutex_); - for (auto &consumer_kv : consumers_) { - streams.emplace_back(consumer_kv.second.Status()); - } - - return streams; -} - -std::vector< - std::pair>> -Streams::Test(const std::string &stream_name, - std::optional limit_batches) { - std::lock_guard g(mutex_); - auto find_it = consumers_.find(stream_name); - if (find_it == consumers_.end()) - throw StreamDoesntExistException(stream_name); - - return find_it->second.Test(limit_batches); -} - -std::string Streams::GetTransformScriptDir() { - return fs::path(streams_directory_) / kTransformDir; -} - -std::string Streams::GetTransformScriptPath(const std::string &stream_name) { - return fs::path(GetTransformScriptDir()) / (stream_name + kTransformExt); -} - -} // namespace integrations::kafka diff --git a/src/integrations/kafka/streams.hpp b/src/integrations/kafka/streams.hpp deleted file mode 100644 index e199b6425..000000000 --- a/src/integrations/kafka/streams.hpp +++ /dev/null @@ -1,127 +0,0 @@ -/// @file -#pragma once - -#include -#include -#include - -#include "integrations/kafka/consumer.hpp" -#include "storage/common/kvstore/kvstore.hpp" - -namespace integrations::kafka { - -/// Manages kafka consumers. -/// -/// This class is responsible for all query supported actions to happen. -class Streams final { - public: - /// Initialize streams. - /// - /// @param streams_directory path on the filesystem where the streams metadata - /// will be persisted and where the transform scripts will be - /// downloaded - /// @param stream_writer lambda that knows how to write data to the db - Streams(const std::string &streams_directory, - std::function< - void(const std::string &, - const std::map &)> - stream_writer); - - /// Looks for persisted metadata and tries to recover consumers. - /// - /// @throws TransformScriptNotFoundException if the transform script is - // missing - /// @throws StreamDeserializationException if the metadata can't be recovered - void Recover(); - - /// Creates a new import stream. - /// This method makes sure there is no other stream with the same name, - /// downloads the given transform script and writes metadata to persisted - /// store. - /// - /// @param info StreamInfo struct with necessary data for a kafka consumer. - /// @param download_transform_script Denote whether or not the transform - /// script should be downloaded. - /// - /// @throws StreamExistsException if the stream with the same name exists - /// @throws StreamMetadataCouldNotBeStored if it can't persist metadata - /// @throws TransformScriptCouldNotBeCreatedException if the script could not - /// be created - void Create(const StreamInfo &info, bool download_transform_script = true); - - /// Deletes an existing stream and all the data that was persisted. - /// - /// @param stream_name name of the stream that needs to be deleted. - /// - /// @throws StreamDoesntExistException if the stream doesn't exist - /// @throws StreamMetadataCouldNotBeDeleted if the persisted metadata can't be - /// delteed - /// @throws TransformScriptNotFoundException if the transform script can't be - /// deleted - void Drop(const std::string &stream_name); - - /// Start consuming from a stream. - /// - /// @param stream_name name of the stream we want to start consuming - /// @param batch_limit number of batches we want to import before stopping - /// - /// @throws StreamDoesntExistException if the stream doesn't exist - /// @throws ConsumerRunningException if the consumer is already running - /// @throws StreamMetadataCouldNotBeStored if it can't persist metadata - void Start(const std::string &stream_name, - std::optional batch_limit = std::nullopt); - - /// Stop consuming from a stream. - /// - /// @param stream_name name of the stream we wanto to stop consuming - /// - /// @throws StreamDoesntExistException if the stream doesn't exist - /// @throws ConsumerStoppedException if the consumer is already stopped - /// @throws StreamMetadataCouldNotBeStored if it can't persist metadata - void Stop(const std::string &stream_name); - - /// Start consuming from all streams that are stopped. - /// - /// @throws StreamMetadataCouldNotBeStored if it can't persist metadata - void StartAll(); - - /// Stop consuming from all streams that are running. - /// - /// @throws StreamMetadataCouldNotBeStored if it can't persist metadata - void StopAll(); - - /// Return current status for all streams. - std::vector Show(); - - /// Do a dry-run consume from a stream. - /// - /// @param stream_name name of the stream we want to test - /// @param batch_limit number of batches we want to test before stopping - /// - /// @returns A vector of pairs consisting of the query (std::string) and its - /// parameters (std::map>> - Test(const std::string &stream_name, - std::optional batch_limit = std::nullopt); - - private: - std::string streams_directory_; - /// Custom lambda that "knows" how to execute queries. - std::function &)> - stream_writer_; - - /// Key value storage used as a persistent storage for stream metadata. - storage::KVStore metadata_store_; - - std::mutex mutex_; - std::unordered_map consumers_; - - std::string GetTransformScriptDir(); - std::string GetTransformScriptPath(const std::string &stream_name); -}; - -} // namespace integrations::kafka diff --git a/src/integrations/kafka/transform.cpp b/src/integrations/kafka/transform.cpp deleted file mode 100644 index 606e564dc..000000000 --- a/src/integrations/kafka/transform.cpp +++ /dev/null @@ -1,654 +0,0 @@ -#include "integrations/kafka/transform.hpp" - -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include - -#include "communication/bolt/v1/value.hpp" -#include "integrations/kafka/exceptions.hpp" -#include "utils/file.hpp" - -DEFINE_string(python_interpreter, "/usr/bin/python3", - "Path to the Python 3.x interpreter that should be used"); - -namespace { - -/////////////////////// -// Namespace shortcuts. -/////////////////////// - -using communication::bolt::Value; -using integrations::kafka::TargetArguments; -using integrations::kafka::TransformExecutionException; -namespace fs = std::filesystem; - -///////////////////////////////////////////////////////////////////////// -// Constants used for starting and communicating with the target process. -///////////////////////////////////////////////////////////////////////// - -const int kPipeReadEnd = 0; -const int kPipeWriteEnd = 1; - -const int kCommunicationToPythonFd = 1000; -const int kCommunicationFromPythonFd = 1002; - -const int kTerminateTimeoutSec = 5; - -const std::string kHelperScriptName = "kafka.py"; -const std::string kTransformScriptName = "transform.py"; - -//////////////////// -// Helper functions. -//////////////////// - -fs::path GetTemporaryPath(pid_t pid) { - return fs::temp_directory_path() / "memgraph" / - fmt::format("transform_{}", pid); -} - -fs::path GetHelperScriptPath() { - char path[PATH_MAX]; - memset(path, 0, PATH_MAX); - auto ret = readlink("/proc/self/exe", path, PATH_MAX); - if (ret < 0) return ""; - return fs::path() / std::string(dirname(path)) / kHelperScriptName; -} - -std::string GetEnvironmentVariable(const std::string &name) { - char *value = secure_getenv(name.c_str()); - if (value == nullptr) return ""; - return {value}; -} - -/////////////////////////////////////////// -// char** wrapper used for C library calls. -/////////////////////////////////////////// - -const int kCharppMaxElements = 20; - -class CharPP final { - public: - CharPP() { memset(data_, 0, sizeof(char *) * kCharppMaxElements); } - - ~CharPP() { - for (size_t i = 0; i < size_; ++i) { - free(data_[i]); - } - } - - CharPP(const CharPP &) = delete; - CharPP(CharPP &&) = delete; - CharPP &operator=(const CharPP &) = delete; - CharPP &operator=(CharPP &&) = delete; - - void Add(const char *value) { - if (size_ == kCharppMaxElements) return; - int len = strlen(value); - char *item = (char *)malloc(sizeof(char) * (len + 1)); - if (item == nullptr) return; - memcpy(item, value, len); - item[len] = 0; - data_[size_++] = item; - } - - void Add(const std::string &value) { Add(value.c_str()); } - - char **Get() { return data_; } - - private: - char *data_[kCharppMaxElements]; - size_t size_{0}; -}; - -//////////////////////////////////// -// Security functions and constants. -//////////////////////////////////// - -const std::vector seccomp_syscalls_allowed = { - SCMP_SYS(read), - SCMP_SYS(write), - SCMP_SYS(close), - SCMP_SYS(stat), - SCMP_SYS(fstat), - SCMP_SYS(lstat), - SCMP_SYS(poll), - SCMP_SYS(lseek), - SCMP_SYS(mmap), - SCMP_SYS(mprotect), - SCMP_SYS(munmap), - SCMP_SYS(brk), - SCMP_SYS(rt_sigaction), - SCMP_SYS(rt_sigprocmask), - SCMP_SYS(rt_sigreturn), - SCMP_SYS(ioctl), - SCMP_SYS(pread64), - SCMP_SYS(pwrite64), - SCMP_SYS(readv), - SCMP_SYS(writev), - SCMP_SYS(access), - SCMP_SYS(select), - SCMP_SYS(mremap), - SCMP_SYS(msync), - SCMP_SYS(mincore), - SCMP_SYS(madvise), - SCMP_SYS(dup), - SCMP_SYS(dup2), - SCMP_SYS(pause), - SCMP_SYS(nanosleep), - SCMP_SYS(getpid), - SCMP_SYS(sendfile), - SCMP_SYS(execve), - SCMP_SYS(exit), - SCMP_SYS(uname), - SCMP_SYS(fcntl), - SCMP_SYS(fsync), - SCMP_SYS(fdatasync), - SCMP_SYS(getdents), - SCMP_SYS(getcwd), - SCMP_SYS(readlink), - SCMP_SYS(gettimeofday), - SCMP_SYS(getrlimit), - SCMP_SYS(getrusage), - SCMP_SYS(getuid), - SCMP_SYS(getgid), - SCMP_SYS(geteuid), - SCMP_SYS(getegid), - SCMP_SYS(getppid), - SCMP_SYS(getpgrp), - SCMP_SYS(rt_sigpending), - SCMP_SYS(rt_sigtimedwait), - SCMP_SYS(rt_sigsuspend), - SCMP_SYS(sched_setparam), - SCMP_SYS(mlock), - SCMP_SYS(munlock), - SCMP_SYS(mlockall), - SCMP_SYS(munlockall), - SCMP_SYS(arch_prctl), - SCMP_SYS(ioperm), - SCMP_SYS(time), - SCMP_SYS(futex), - SCMP_SYS(set_tid_address), - SCMP_SYS(clock_gettime), - SCMP_SYS(clock_getres), - SCMP_SYS(clock_nanosleep), - SCMP_SYS(exit_group), - SCMP_SYS(mbind), - SCMP_SYS(set_mempolicy), - SCMP_SYS(get_mempolicy), - SCMP_SYS(migrate_pages), - SCMP_SYS(openat), - SCMP_SYS(pselect6), - SCMP_SYS(ppoll), - SCMP_SYS(set_robust_list), - SCMP_SYS(get_robust_list), - SCMP_SYS(tee), - SCMP_SYS(move_pages), - SCMP_SYS(dup3), - SCMP_SYS(preadv), - SCMP_SYS(pwritev), - SCMP_SYS(getrandom), - SCMP_SYS(sigaltstack), - SCMP_SYS(gettid), - SCMP_SYS(tgkill), - SCMP_SYS(sysinfo), -}; - -bool SetupSeccomp() { - // Initialize the seccomp context. - scmp_filter_ctx ctx; - ctx = seccomp_init(SCMP_ACT_TRAP); - if (ctx == NULL) return false; - - // First we deny access to the `open` system call called with `O_WRONLY`, - // `O_RDWR` and `O_CREAT`. - if (seccomp_rule_add(ctx, SCMP_ACT_KILL, SCMP_SYS(open), 1, - SCMP_A1(SCMP_CMP_MASKED_EQ, O_WRONLY, O_WRONLY)) != 0) { - seccomp_release(ctx); - return false; - } - if (seccomp_rule_add(ctx, SCMP_ACT_KILL, SCMP_SYS(open), 1, - SCMP_A1(SCMP_CMP_MASKED_EQ, O_RDWR, O_RDWR)) != 0) { - seccomp_release(ctx); - return false; - } - if (seccomp_rule_add(ctx, SCMP_ACT_KILL, SCMP_SYS(open), 1, - SCMP_A1(SCMP_CMP_MASKED_EQ, O_CREAT, O_CREAT)) != 0) { - seccomp_release(ctx); - return false; - } - // Now we allow the `open` system call without the blocked flags. - if (seccomp_rule_add( - ctx, SCMP_ACT_ALLOW, SCMP_SYS(open), 1, - SCMP_A1(SCMP_CMP_MASKED_EQ, O_WRONLY | O_RDWR | O_CREAT, 0)) != 0) { - seccomp_release(ctx); - return false; - } - - // Add all general allow rules. - for (auto syscall_num : seccomp_syscalls_allowed) { - if (seccomp_rule_add(ctx, SCMP_ACT_ALLOW, syscall_num, 0) != 0) { - seccomp_release(ctx); - return false; - } - } - - // Load the context for the current process. - auto ret = seccomp_load(ctx); - - // Free the context and return success/failure. - seccomp_release(ctx); - return ret == 0; -} - -bool SetLimit(int resource, rlim_t n) { - struct rlimit limit; - limit.rlim_cur = limit.rlim_max = n; - return setrlimit(resource, &limit) == 0; -} - -/////////////////////////////////////////////////////// -// Target function used to start the transform process. -/////////////////////////////////////////////////////// - -int Target(void *arg) { - // NOTE: (D)LOG shouldn't be used here because it wasn't initialized in this - // process and something really bad could happen. - - // Get a pointer to the passed arguments. - TargetArguments *ta = reinterpret_cast(arg); - - // Redirect `stdin` to `/dev/null`. - int fd = open("/dev/null", O_RDONLY | O_CLOEXEC); - if (fd == -1) { - return EXIT_FAILURE; - } - if (dup2(fd, STDIN_FILENO) != STDIN_FILENO) { - return EXIT_FAILURE; - } - - // Redirect `stdout` to `/dev/null`. - fd = open("/dev/null", O_WRONLY | O_CLOEXEC); - if (fd == -1) { - return EXIT_FAILURE; - } - if (dup2(fd, STDOUT_FILENO) != STDOUT_FILENO) { - return EXIT_FAILURE; - } - - // Redirect `stderr` to `/dev/null`. - fd = open("/dev/null", O_WRONLY | O_CLOEXEC); - if (fd == -1) { - return EXIT_FAILURE; - } - if (dup2(fd, STDERR_FILENO) != STDERR_FILENO) { - return EXIT_FAILURE; - } - - // Create the working directory. - fs::path working_path = GetTemporaryPath(getpid()); - utils::DeleteDir(working_path); - if (!utils::EnsureDir(working_path)) { - return EXIT_FAILURE; - } - - // Copy all scripts to the working directory. - if (!utils::CopyFile(GetHelperScriptPath(), - working_path / kHelperScriptName)) { - return EXIT_FAILURE; - } - if (!utils::CopyFile(ta->transform_script_path, - working_path / kTransformScriptName)) { - return EXIT_FAILURE; - } - - // Change the current directory to the working directory. - if (chdir(working_path.c_str()) != 0) { - return EXIT_FAILURE; - } - - // Create the executable CharPP object. - CharPP exe; - exe.Add(FLAGS_python_interpreter); - exe.Add(kHelperScriptName); - - // Create the environment CharPP object. - CharPP env; - env.Add(fmt::format("PATH={}", GetEnvironmentVariable("PATH"))); - // TODO (mferencevic): Change this to the effective user. - env.Add(fmt::format("USER={}", GetEnvironmentVariable("USER"))); - env.Add(fmt::format("HOME={}", working_path)); - env.Add("LANG=en_US.utf8"); - env.Add("LANGUAGE=en_US:en"); - env.Add("PYTHONUNBUFFERED=1"); - env.Add("PYTHONIOENCODING=utf-8"); - env.Add("PYTHONDONTWRITEBYTECODE=1"); - - // Connect the communication input pipe. - if (dup2(ta->pipe_to_python, kCommunicationToPythonFd) != - kCommunicationToPythonFd) { - return EXIT_FAILURE; - } - - // Connect the communication output pipe. - if (dup2(ta->pipe_from_python, kCommunicationFromPythonFd) != - kCommunicationFromPythonFd) { - return EXIT_FAILURE; - } - - // Set process limits. - // Disable core dumps. - if (!SetLimit(RLIMIT_CORE, 0)) { - return EXIT_FAILURE; - } - // Disable file creation. - if (!SetLimit(RLIMIT_FSIZE, 0)) { - return EXIT_FAILURE; - } - // Set process number limit. - if (!SetLimit(RLIMIT_NPROC, 0)) { - return EXIT_FAILURE; - } - - // TODO (mferencevic): Change the user to `nobody`. - - // Setup seccomp. - if (!SetupSeccomp()) { - return EXIT_FAILURE; - } - - execve(*exe.Get(), exe.Get(), env.Get()); - - // TODO (mferencevic): Log an error with `errno` about what failed. - - return EXIT_FAILURE; -} - -///////////////////////////////////////////////////////////// -// Functions used to send data to the started Python process. -///////////////////////////////////////////////////////////// - -/// The data that is being sent to the Python process is always a -/// `std::vector` of data. It is sent in the following way: -/// -/// uint32_t number of elements being sent -/// uint32_t element 0 size -/// uint8_t[] element 0 data -/// uint32_t element 1 size -/// uint8_t[] element 1 data -/// ... -/// -/// The receiving end of the protocol is implemented in `kafka.py`. - -void PutData(int fd, const uint8_t *data, uint32_t size) { - int ret = 0; - uint32_t put = 0; - while (put < size) { - ret = write(fd, data + put, size - put); - if (ret > 0) { - put += ret; - } else if (ret == 0) { - throw TransformExecutionException( - "The communication pipe to the transform process was closed!"); - } else if (errno != EINTR) { - throw TransformExecutionException( - "Couldn't put data to the transfrom process!"); - } - } -} - -void PutSize(int fd, uint32_t size) { - PutData(fd, reinterpret_cast(&size), sizeof(size)); -} - -////////////////////////////////////////////////////////////// -// Functions used to get data from the started Python process. -////////////////////////////////////////////////////////////// - -/// The data that is being sent from the Python process is always a -/// `std::vector>>` of data (array of pairs of -/// query and params). It is sent in the following way: -/// -/// uint32_t number of elements being sent -/// uint32_t element 0 query size -/// char[] element 0 query data -/// data[] element 0 params -/// uint32_t element 1 query size -/// char[] element 1 query data -/// data[] element 1 params -/// ... -/// -/// When sending the query parameters they have to be further encoded to enable -/// sending of None, Bool, Int, Float, Str, List and Dict objects. The encoding -/// is as follows: -/// -/// None: uint8_t type (kTypeNone) -/// Bool: uint8_t type (kTypeBoolFalse or kTypeBoolTrue) -/// Int: uint8_t type (kTypeInt), int64_t value -/// Float: uint8_t type (kTypeFloat), double value -/// Str: uint8_t type (kTypeStr), uint32_t size, char[] data -/// List: uint8_t type (kTypeList), uint32_t size, data[] element 0, -/// data[] element 1, ... -/// Dict: uint8_t type (kTypeDict), uint32_t size, uint32_t element 0 key size, -/// char[] element 0 key data, data[] element 0 value, -/// uint32_t element 1 key size, char[] element 1 key data, -/// data[] element 1 value, ... -/// -/// The sending end of the protocol is implemented in `kafka.py`. - -const uint8_t kTypeNone = 0x10; -const uint8_t kTypeBoolFalse = 0x20; -const uint8_t kTypeBoolTrue = 0x21; -const uint8_t kTypeInt = 0x30; -const uint8_t kTypeFloat = 0x40; -const uint8_t kTypeStr = 0x50; -const uint8_t kTypeList = 0x60; -const uint8_t kTypeDict = 0x70; - -void GetData(int fd, uint8_t *data, uint32_t size) { - int ret = 0; - uint32_t got = 0; - while (got < size) { - ret = read(fd, data + got, size - got); - if (ret > 0) { - got += ret; - } else if (ret == 0) { - throw TransformExecutionException( - "The communication pipe from the transform process was closed!"); - } else if (errno != EINTR) { - throw TransformExecutionException( - "Couldn't get data from the transform process!"); - } - } -} - -uint32_t GetSize(int fd) { - uint32_t size = 0; - GetData(fd, reinterpret_cast(&size), sizeof(size)); - return size; -} - -void GetString(int fd, std::string *value) { - const int kMaxStackBuffer = 8192; - uint8_t buffer[kMaxStackBuffer]; - uint32_t size = GetSize(fd); - if (size < kMaxStackBuffer) { - GetData(fd, buffer, size); - *value = std::string(reinterpret_cast(buffer), size); - } else { - std::unique_ptr tmp(new uint8_t[size]); - GetData(fd, tmp.get(), size); - *value = std::string(reinterpret_cast(tmp.get()), size); - } -} - -void GetValue(int fd, Value *value) { - uint8_t type = 0; - GetData(fd, &type, sizeof(type)); - if (type == kTypeNone) { - *value = Value(); - } else if (type == kTypeBoolFalse) { - *value = Value(false); - } else if (type == kTypeBoolTrue) { - *value = Value(true); - } else if (type == kTypeInt) { - int64_t tmp = 0; - GetData(fd, reinterpret_cast(&tmp), sizeof(tmp)); - *value = Value(tmp); - } else if (type == kTypeFloat) { - double tmp = 0.0; - GetData(fd, reinterpret_cast(&tmp), sizeof(tmp)); - *value = Value(tmp); - } else if (type == kTypeStr) { - std::string tmp; - GetString(fd, &tmp); - *value = Value(tmp); - } else if (type == kTypeList) { - std::vector tmp_vec; - uint32_t size = GetSize(fd); - tmp_vec.reserve(size); - for (uint32_t i = 0; i < size; ++i) { - Value tmp_value; - GetValue(fd, &tmp_value); - tmp_vec.push_back(tmp_value); - } - *value = Value(tmp_vec); - } else if (type == kTypeDict) { - std::map tmp_map; - uint32_t size = GetSize(fd); - for (uint32_t i = 0; i < size; ++i) { - std::string tmp_key; - Value tmp_value; - GetString(fd, &tmp_key); - GetValue(fd, &tmp_value); - tmp_map.insert({tmp_key, tmp_value}); - } - *value = Value(tmp_map); - } else { - throw TransformExecutionException( - fmt::format("Couldn't get value of unsupported type 0x{:02x}!", type)); - } -} - -} // namespace - -namespace integrations::kafka { - -Transform::Transform(const std::string &transform_script_path) - : transform_script_path_(transform_script_path) {} - -bool Transform::Start() { - // Setup communication pipes. - if (pipe2(pipe_to_python_, O_CLOEXEC) != 0) { - DLOG(ERROR) << "Couldn't create communication pipe from cpp to python!"; - return false; - } - if (pipe2(pipe_from_python_, O_CLOEXEC) != 0) { - DLOG(ERROR) << "Couldn't create communication pipe from python to cpp!"; - return false; - } - - // Find the top of the stack. - uint8_t *stack_top = stack_.get() + kStackSizeBytes; - - // Set the target arguments. - target_arguments_->transform_script_path = transform_script_path_; - target_arguments_->pipe_to_python = pipe_to_python_[kPipeReadEnd]; - target_arguments_->pipe_from_python = pipe_from_python_[kPipeWriteEnd]; - - // Create the process. - pid_ = clone(Target, stack_top, CLONE_VFORK, target_arguments_.get()); - if (pid_ == -1) { - DLOG(ERROR) << "Couldn't create the communication process!"; - return false; - } - - // Close pipes that won't be used from the master process. - close(pipe_to_python_[kPipeReadEnd]); - close(pipe_from_python_[kPipeWriteEnd]); - - return true; -} - -void Transform::Apply( - const std::vector> &batch, - std::function &)> - query_function) { - // Check that the process is alive. - if (waitpid(pid_, &status_, WNOHANG | WUNTRACED) != 0) { - throw TransformExecutionException("The transform process has died!"); - } - - // Put the `batch` data to the transform process. - PutSize(pipe_to_python_[kPipeWriteEnd], batch.size()); - for (const auto &item : batch) { - PutSize(pipe_to_python_[kPipeWriteEnd], item->len()); - PutData(pipe_to_python_[kPipeWriteEnd], - reinterpret_cast(item->payload()), item->len()); - } - - // Get `query` and `params` data from the transfrom process. - uint32_t size = GetSize(pipe_from_python_[kPipeReadEnd]); - for (uint32_t i = 0; i < size; ++i) { - std::string query; - Value params; - GetString(pipe_from_python_[kPipeReadEnd], &query); - GetValue(pipe_from_python_[kPipeReadEnd], ¶ms); - query_function(query, params.ValueMap()); - } -} - -Transform::~Transform() { - // Try to terminate the process gracefully in `kTerminateTimeoutSec`. - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - for (int i = 0; i < kTerminateTimeoutSec * 10; ++i) { - DLOG(INFO) << "Terminating the transform process with pid " << pid_; - kill(pid_, SIGTERM); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - int ret = waitpid(pid_, &status_, WNOHANG | WUNTRACED); - if (ret == pid_ || ret == -1) { - break; - } - } - - // If the process is still alive, kill it and wait for it to die. - if (waitpid(pid_, &status_, WNOHANG | WUNTRACED) == 0) { - DLOG(WARNING) << "Killing the transform process with pid " << pid_; - kill(pid_, SIGKILL); - waitpid(pid_, &status_, 0); - } - - // Delete the working directory. - if (pid_ != -1) { - utils::DeleteDir(GetTemporaryPath(pid_)); - } - - // Close leftover open pipes. - // We have to be careful to close only the leftover open pipes (the - // pipe_to_python WriteEnd and pipe_from_python ReadEnd), the other two ends - // were closed in the function that created them because they aren't used from - // the master process (they are only used from the Python process). - close(pipe_to_python_[kPipeWriteEnd]); - close(pipe_from_python_[kPipeReadEnd]); -} - -} // namespace integrations::kafka diff --git a/src/integrations/kafka/transform.hpp b/src/integrations/kafka/transform.hpp deleted file mode 100644 index 0e7a8e897..000000000 --- a/src/integrations/kafka/transform.hpp +++ /dev/null @@ -1,65 +0,0 @@ -/// @file -#pragma once - -#include -#include -#include - -#include "rdkafkacpp.h" - -#include "communication/bolt/v1/value.hpp" - -namespace integrations::kafka { - -struct TargetArguments { - std::filesystem::path transform_script_path; - int pipe_to_python{-1}; - int pipe_from_python{-1}; -}; - -/// Wrapper around the transform script for a stream. -class Transform final { - private: - const int kStackSizeBytes = 262144; - - public: - /// Download the transform script from the given URI and store it on the given - /// path. - /// - /// @param transform_script_uri URI of the script - /// @param transform_script_path path on the filesystem where the script - /// will be stored - /// - /// @throws TransformScriptDownloadException if it can't download the script - explicit Transform(const std::string &transform_script_path); - - /// Starts the transform script. - /// - /// @return bool True on success or False otherwise. - bool Start(); - - /// Transform the given batch of messages using the transform script. - /// - /// @param batch kafka message batch - /// @return std::vector transformed batch of kafka messages - void Apply(const std::vector> &batch, - std::function &)> - query_function); - - ~Transform(); - - private: - std::string transform_script_path_; - pid_t pid_{-1}; - int status_{0}; - // The stack used for the `clone` system call must be heap allocated. - std::unique_ptr stack_{new uint8_t[kStackSizeBytes]}; - // The target arguments passed to the new process must be heap allocated. - std::unique_ptr target_arguments_{new TargetArguments()}; - int pipe_to_python_[2] = {-1, -1}; - int pipe_from_python_[2] = {-1, -1}; -}; - -} // namespace integrations::kafka diff --git a/src/memgraph.cpp b/src/memgraph.cpp index 98b769db8..f39af28f9 100644 --- a/src/memgraph.cpp +++ b/src/memgraph.cpp @@ -15,8 +15,6 @@ #else #include "database/single_node/graph_db.hpp" #endif -#include "integrations/kafka/exceptions.hpp" -#include "integrations/kafka/streams.hpp" #include "memgraph_init.hpp" #include "query/exceptions.hpp" #include "telemetry/telemetry.hpp" @@ -100,23 +98,7 @@ void SingleNodeMain() { query::InterpreterContext interpreter_context{&db}; SessionData session_data{&db, &interpreter_context, &auth, &audit_log}; - integrations::kafka::Streams kafka_streams{ - durability_directory / "streams", - [&session_data]( - const std::string &query, - const std::map ¶ms) { - KafkaStreamWriter(session_data, query, params); - }}; - interpreter_context.auth = &auth; - interpreter_context.kafka_streams = &kafka_streams; - - try { - // Recover possible streams. - kafka_streams.Recover(); - } catch (const integrations::kafka::KafkaStreamException &e) { - LOG(ERROR) << e.what(); - } ServerContext context; std::string service_name = "Bolt"; diff --git a/src/memgraph_init.cpp b/src/memgraph_init.cpp index 23362a919..743c2d2d4 100644 --- a/src/memgraph_init.cpp +++ b/src/memgraph_init.cpp @@ -165,28 +165,6 @@ void BoltSession::TypedValueResultStream::Result( encoder_->MessageRecord(decoded_values); } -void KafkaStreamWriter( - SessionData &session_data, const std::string &query, - const std::map ¶ms) { - query::Interpreter interpreter(session_data.interpreter_context); - KafkaResultStream stream; - std::map params_pv; - for (const auto &kv : params) - params_pv.emplace(kv.first, glue::ToPropertyValue(kv.second)); - - try { - // NOTE: This potentially allows Kafka streams to execute transaction - // control queries. However, those will not really work as a new - // `Interpreter` instance is created upon every call to this function, - // meaning any multicommand transaction state is lost. - interpreter.Interpret(query, params_pv); - interpreter.PullAll(&stream); - } catch (const utils::BasicException &e) { - LOG(WARNING) << "[Kafka] query execution failed with an exception: " - << e.what(); - } -}; - // Needed to correctly handle memgraph destruction from a signal handler. // Without having some sort of a flag, it is possible that a signal is handled // when we are exiting main, inside destructors of database::GraphDb and diff --git a/src/memgraph_init.hpp b/src/memgraph_init.hpp index f8902a740..c8ac4de90 100644 --- a/src/memgraph_init.hpp +++ b/src/memgraph_init.hpp @@ -100,20 +100,6 @@ class BoltSession final io::network::Endpoint endpoint_; }; -/// Class that implements ResultStream API for Kafka. -/// -/// Kafka doesn't need to stream the import results back to the client so we -/// don't need any functionality here. -class KafkaResultStream { - public: - void Result(const std::vector &) {} -}; - -/// Writes data streamed from kafka to memgraph. -void KafkaStreamWriter( - SessionData &session_data, const std::string &query, - const std::map ¶ms); - /// Set up signal handlers and register `shutdown` on SIGTERM and SIGINT. /// In most cases you don't have to call this. If you are using a custom server /// startup function for `WithInit`, then you probably need to use this to diff --git a/src/query/frontend/ast/ast.lcp b/src/query/frontend/ast/ast.lcp index 5d73fa209..56fef5fa5 100644 --- a/src/query/frontend/ast/ast.lcp +++ b/src/query/frontend/ast/ast.lcp @@ -2061,7 +2061,7 @@ cpp<# show-users-for-role) (:serialize)) (lcp:define-enum privilege - (create delete match merge set remove index stats auth stream constraint + (create delete match merge set remove index stats auth constraint dump) (:serialize)) #>cpp @@ -2096,64 +2096,10 @@ const std::vector kPrivilegesAll = { AuthQuery::Privilege::MATCH, AuthQuery::Privilege::MERGE, AuthQuery::Privilege::SET, AuthQuery::Privilege::REMOVE, AuthQuery::Privilege::INDEX, AuthQuery::Privilege::STATS, - AuthQuery::Privilege::AUTH, AuthQuery::Privilege::STREAM, + AuthQuery::Privilege::AUTH, AuthQuery::Privilege::CONSTRAINT, AuthQuery::Privilege::DUMP}; cpp<# -(lcp:define-class stream-query (query) - ((action "Action" :scope :public) - (stream-name "std::string" :scope :public) - (stream-uri "Expression *" :scope :public :initval "nullptr" - :slk-save #'slk-save-ast-pointer - :slk-load (slk-load-ast-pointer "Expression")) - (stream-topic "Expression *" :scope :public :initval "nullptr" - :slk-save #'slk-save-ast-pointer - :slk-load (slk-load-ast-pointer "Expression")) - (transform-uri "Expression *" :scope :public :initval "nullptr" - :slk-save #'slk-save-ast-pointer - :slk-load (slk-load-ast-pointer "Expression")) - (batch-interval-in-ms "Expression *" :scope :public :initval "nullptr" - :slk-save #'slk-save-ast-pointer - :slk-load (slk-load-ast-pointer "Expression")) - (batch-size "Expression *" :scope :public :initval "nullptr" - :slk-save #'slk-save-ast-pointer - :slk-load (slk-load-ast-pointer "Expression")) - (limit-batches "Expression *" :scope :public :initval "nullptr" - :slk-save #'slk-save-ast-pointer - :slk-load (slk-load-ast-pointer "Expression"))) - (:public - (lcp:define-enum action - (create-stream drop-stream show-streams start-stream stop-stream - start-all-streams stop-all-streams test-stream) - (:serialize)) - - #>cpp - StreamQuery() = default; - - DEFVISITABLE(QueryVisitor); - cpp<#) - (:protected - #>cpp - StreamQuery(Action action, std::string stream_name, Expression *stream_uri, - Expression *stream_topic, Expression *transform_uri, - Expression *batch_interval_in_ms, Expression *batch_size, - Expression *limit_batches) - : action_(action), - stream_name_(std::move(stream_name)), - stream_uri_(stream_uri), - stream_topic_(stream_topic), - transform_uri_(transform_uri), - batch_interval_in_ms_(batch_interval_in_ms), - batch_size_(batch_size), - limit_batches_(limit_batches) {} - cpp<#) - (:private - #>cpp - friend class AstStorage; - cpp<#) - (:serialize (:slk)) - (:clone)) - (lcp:define-class info-query (query) ((info-type "InfoType" :scope :public)) (:public diff --git a/src/query/frontend/ast/ast_visitor.hpp b/src/query/frontend/ast/ast_visitor.hpp index c48e0c537..aa2625df5 100644 --- a/src/query/frontend/ast/ast_visitor.hpp +++ b/src/query/frontend/ast/ast_visitor.hpp @@ -65,7 +65,6 @@ class AuthQuery; class ExplainQuery; class ProfileQuery; class IndexQuery; -class StreamQuery; class InfoQuery; class ConstraintQuery; class RegexMatch; @@ -113,7 +112,7 @@ class ExpressionVisitor template class QueryVisitor : public ::utils::Visitor {}; } // namespace query diff --git a/src/query/frontend/ast/cypher_main_visitor.cpp b/src/query/frontend/ast/cypher_main_visitor.cpp index e8c821105..c69c26770 100644 --- a/src/query/frontend/ast/cypher_main_visitor.cpp +++ b/src/query/frontend/ast/cypher_main_visitor.cpp @@ -178,15 +178,6 @@ antlrcpp::Any CypherMainVisitor::visitAuthQuery( return auth_query; } -antlrcpp::Any CypherMainVisitor::visitStreamQuery( - MemgraphCypher::StreamQueryContext *ctx) { - CHECK(ctx->children.size() == 1) - << "StreamQuery should have exactly one child!"; - auto *stream_query = ctx->children[0]->accept(this).as(); - query_ = stream_query; - return stream_query; -} - antlrcpp::Any CypherMainVisitor::visitDumpQuery( MemgraphCypher::DumpQueryContext *ctx) { auto *dump_query = storage_->Create(); @@ -538,7 +529,6 @@ antlrcpp::Any CypherMainVisitor::visitPrivilege( if (ctx->INDEX()) return AuthQuery::Privilege::INDEX; if (ctx->STATS()) return AuthQuery::Privilege::STATS; if (ctx->AUTH()) return AuthQuery::Privilege::AUTH; - if (ctx->STREAM()) return AuthQuery::Privilege::STREAM; if (ctx->CONSTRAINT()) return AuthQuery::Privilege::CONSTRAINT; LOG(FATAL) << "Should not get here - unknown privilege!"; } @@ -576,152 +566,6 @@ antlrcpp::Any CypherMainVisitor::visitShowUsersForRole( return auth; } -/** - * @return StreamQuery* - */ -antlrcpp::Any CypherMainVisitor::visitCreateStream( - MemgraphCypher::CreateStreamContext *ctx) { - auto *stream_query = storage_->Create(); - stream_query->action_ = StreamQuery::Action::CREATE_STREAM; - stream_query->stream_name_ = ctx->streamName()->getText(); - if (!ctx->streamUri->StringLiteral()) { - throw SyntaxException("Stream URI should be a string literal."); - } - stream_query->stream_uri_ = ctx->streamUri->accept(this); - if (!ctx->streamTopic->StringLiteral()) { - throw SyntaxException("Topic should be a string literal."); - } - stream_query->stream_topic_ = ctx->streamTopic->accept(this); - if (!ctx->transformUri->StringLiteral()) { - throw SyntaxException("Transform URI should be a string literal."); - } - stream_query->transform_uri_ = ctx->transformUri->accept(this); - if (ctx->batchIntervalOption()) { - stream_query->batch_interval_in_ms_ = - ctx->batchIntervalOption()->accept(this); - } - if (ctx->batchSizeOption()) { - stream_query->batch_size_ = ctx->batchSizeOption()->accept(this); - } - return stream_query; -} - -/** - * @return Expression* - */ -antlrcpp::Any CypherMainVisitor::visitBatchIntervalOption( - MemgraphCypher::BatchIntervalOptionContext *ctx) { - if (!ctx->literal()->numberLiteral() || - !ctx->literal()->numberLiteral()->integerLiteral()) { - throw SyntaxException("Batch interval should be an integer."); - } - return ctx->literal()->accept(this); -} - -/** - * @return Expression* - */ -antlrcpp::Any CypherMainVisitor::visitBatchSizeOption( - MemgraphCypher::BatchSizeOptionContext *ctx) { - if (!ctx->literal()->numberLiteral() || - !ctx->literal()->numberLiteral()->integerLiteral()) { - throw SyntaxException("Batch size should be an integer."); - } - return ctx->literal()->accept(this); -} - -/** - * @return StreamQuery* - */ -antlrcpp::Any CypherMainVisitor::visitDropStream( - MemgraphCypher::DropStreamContext *ctx) { - auto *stream_query = storage_->Create(); - stream_query->action_ = StreamQuery::Action::DROP_STREAM; - stream_query->stream_name_ = ctx->streamName()->getText(); - return stream_query; -} - -/** - * @return StreamQuery* - */ -antlrcpp::Any CypherMainVisitor::visitShowStreams( - MemgraphCypher::ShowStreamsContext *ctx) { - auto *stream_query = storage_->Create(); - stream_query->action_ = StreamQuery::Action::SHOW_STREAMS; - return stream_query; -} - -/** - * @return StreamQuery* - */ -antlrcpp::Any CypherMainVisitor::visitStartStream( - MemgraphCypher::StartStreamContext *ctx) { - auto *stream_query = storage_->Create(); - stream_query->action_ = StreamQuery::Action::START_STREAM; - stream_query->stream_name_ = std::string(ctx->streamName()->getText()); - if (ctx->limitBatchesOption()) { - stream_query->limit_batches_ = ctx->limitBatchesOption()->accept(this); - } - return stream_query; -} - -/** - * @return StreamQuery* - */ -antlrcpp::Any CypherMainVisitor::visitStopStream( - MemgraphCypher::StopStreamContext *ctx) { - auto *stream_query = storage_->Create(); - stream_query->action_ = StreamQuery::Action::STOP_STREAM; - stream_query->stream_name_ = std::string(ctx->streamName()->getText()); - return stream_query; -} - -/** - * @return Expression* - */ -antlrcpp::Any CypherMainVisitor::visitLimitBatchesOption( - MemgraphCypher::LimitBatchesOptionContext *ctx) { - if (!ctx->literal()->numberLiteral() || - !ctx->literal()->numberLiteral()->integerLiteral()) { - throw SyntaxException("Batch limit should be an integer."); - } - return ctx->literal()->accept(this); -} - -/* - * @return StreamQuery* - */ -antlrcpp::Any CypherMainVisitor::visitStartAllStreams( - MemgraphCypher::StartAllStreamsContext *ctx) { - auto *stream_query = storage_->Create(); - stream_query->action_ = StreamQuery::Action::START_ALL_STREAMS; - return stream_query; -} - -/* - * @return StreamQuery* - */ -antlrcpp::Any CypherMainVisitor::visitStopAllStreams( - MemgraphCypher::StopAllStreamsContext *ctx) { - auto *stream_query = storage_->Create(); - stream_query->action_ = StreamQuery::Action::STOP_ALL_STREAMS; - return stream_query; -} - -/** - * @return StreamQuery* - */ -antlrcpp::Any CypherMainVisitor::visitTestStream( - MemgraphCypher::TestStreamContext *ctx) { - auto *stream_query = storage_->Create(); - stream_query->action_ = StreamQuery::Action::TEST_STREAM; - stream_query->stream_name_ = std::string(ctx->streamName()->getText()); - if (ctx->limitBatchesOption()) { - stream_query->limit_batches_ = ctx->limitBatchesOption()->accept(this); - } - return stream_query; -} - antlrcpp::Any CypherMainVisitor::visitCypherReturn( MemgraphCypher::CypherReturnContext *ctx) { auto *return_clause = storage_->Create(); diff --git a/src/query/frontend/ast/cypher_main_visitor.hpp b/src/query/frontend/ast/cypher_main_visitor.hpp index f9851bd9e..24842f519 100644 --- a/src/query/frontend/ast/cypher_main_visitor.hpp +++ b/src/query/frontend/ast/cypher_main_visitor.hpp @@ -181,12 +181,6 @@ class CypherMainVisitor : public antlropencypher::MemgraphCypherBaseVisitor { */ antlrcpp::Any visitAuthQuery(MemgraphCypher::AuthQueryContext *ctx) override; - /** - * @return StreamQuery* - */ - antlrcpp::Any visitStreamQuery( - MemgraphCypher::StreamQueryContext *ctx) override; - /** * @return DumpQuery* */ @@ -326,63 +320,6 @@ class CypherMainVisitor : public antlropencypher::MemgraphCypherBaseVisitor { antlrcpp::Any visitShowUsersForRole( MemgraphCypher::ShowUsersForRoleContext *ctx) override; - /** - * @return StreamQuery* - */ - antlrcpp::Any visitCreateStream( - MemgraphCypher::CreateStreamContext *ctx) override; - - antlrcpp::Any visitBatchIntervalOption( - MemgraphCypher::BatchIntervalOptionContext *ctx) override; - - antlrcpp::Any visitBatchSizeOption( - MemgraphCypher::BatchSizeOptionContext *ctx) override; - - /** - * @return StreamQuery* - */ - antlrcpp::Any visitDropStream( - MemgraphCypher::DropStreamContext *ctx) override; - - /** - * @return StreamQuery* - */ - antlrcpp::Any visitShowStreams( - MemgraphCypher::ShowStreamsContext *ctx) override; - - /** - * @return StreamQuery* - */ - antlrcpp::Any visitStartStream( - MemgraphCypher::StartStreamContext *ctx) override; - - /** - * @return StreamQuery* - */ - antlrcpp::Any visitStopStream( - MemgraphCypher::StopStreamContext *ctx) override; - - antlrcpp::Any visitLimitBatchesOption( - MemgraphCypher::LimitBatchesOptionContext *ctx) override; - - /** - * @return StreamQuery* - */ - antlrcpp::Any visitStartAllStreams( - MemgraphCypher::StartAllStreamsContext *ctx) override; - - /** - * @return StreamQuery* - */ - antlrcpp::Any visitStopAllStreams( - MemgraphCypher::StopAllStreamsContext *ctx) override; - - /** - * @return StreamQuery* - */ - antlrcpp::Any visitTestStream( - MemgraphCypher::TestStreamContext *ctx) override; - /** * @return Return* */ diff --git a/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 b/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 index 0a8300f81..3d484b055 100644 --- a/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 +++ b/src/query/frontend/opencypher/grammar/MemgraphCypher.g4 @@ -9,10 +9,7 @@ import Cypher ; memgraphCypherKeyword : cypherKeyword | ALTER | AUTH - | BATCH - | BATCHES | CLEAR - | DATA | DATABASE | DENY | DROP @@ -21,24 +18,13 @@ memgraphCypherKeyword : cypherKeyword | FROM | GRANT | IDENTIFIED - | INTERVAL - | K_TEST - | KAFKA - | LOAD | PASSWORD | PRIVILEGES | REVOKE | ROLE | ROLES - | SIZE - | START | STATS - | STOP - | STREAM - | STREAMS | TO - | TOPIC - | TRANSFORM | USER | USERS ; @@ -55,7 +41,6 @@ query : cypherQuery | infoQuery | constraintQuery | authQuery - | streamQuery | dumpQuery ; @@ -104,7 +89,7 @@ denyPrivilege : DENY ( ALL PRIVILEGES | privileges=privilegeList ) TO userOrRole revokePrivilege : REVOKE ( ALL PRIVILEGES | privileges=privilegeList ) FROM userOrRole=userOrRoleName ; privilege : CREATE | DELETE | MATCH | MERGE | SET - | REMOVE | INDEX | STATS | AUTH | STREAM | CONSTRAINT | DUMP ; + | REMOVE | INDEX | STATS | AUTH | CONSTRAINT | DUMP ; privilegeList : privilege ( ',' privilege )* ; @@ -114,40 +99,4 @@ showRoleForUser : SHOW ROLE FOR user=userOrRoleName ; showUsersForRole : SHOW USERS FOR role=userOrRoleName ; -streamQuery : createStream - | dropStream - | showStreams - | startStream - | stopStream - | startAllStreams - | stopAllStreams - | testStream - ; - -streamName : symbolicName ; - -createStream : CREATE STREAM streamName AS LOAD DATA KAFKA -streamUri=literal WITH TOPIC streamTopic=literal WITH TRANSFORM -transformUri=literal ( batchIntervalOption )? ( batchSizeOption )? ; - -batchIntervalOption : BATCH INTERVAL literal ; - -batchSizeOption : BATCH SIZE literal ; - -dropStream : DROP STREAM streamName ; - -showStreams : SHOW STREAMS ; - -startStream : START STREAM streamName ( limitBatchesOption )? ; - -stopStream : STOP STREAM streamName ; - -limitBatchesOption : LIMIT limitBatches=literal BATCHES ; - -startAllStreams : START ALL STREAMS ; - -stopAllStreams : STOP ALL STREAMS ; - -testStream : K_TEST STREAM streamName ( limitBatchesOption )? ; - dumpQuery: DUMP DATABASE ; diff --git a/src/query/frontend/opencypher/grammar/MemgraphCypherLexer.g4 b/src/query/frontend/opencypher/grammar/MemgraphCypherLexer.g4 index 354087030..e191f148c 100644 --- a/src/query/frontend/opencypher/grammar/MemgraphCypherLexer.g4 +++ b/src/query/frontend/opencypher/grammar/MemgraphCypherLexer.g4 @@ -12,10 +12,7 @@ import CypherLexer ; ALTER : A L T E R ; AUTH : A U T H ; -BATCH : B A T C H ; -BATCHES : B A T C H E S ; CLEAR : C L E A R ; -DATA : D A T A ; DATABASE : D A T A B A S E ; DENY : D E N Y ; DROP : D R O P ; @@ -25,23 +22,12 @@ FROM : F R O M ; GRANT : G R A N T ; GRANTS : G R A N T S ; IDENTIFIED : I D E N T I F I E D ; -INTERVAL : I N T E R V A L ; -K_TEST : T E S T ; -KAFKA : K A F K A ; -LOAD : L O A D ; PASSWORD : P A S S W O R D ; PRIVILEGES : P R I V I L E G E S ; REVOKE : R E V O K E ; ROLE : R O L E ; ROLES : R O L E S ; -SIZE : S I Z E ; -START : S T A R T ; STATS : S T A T S ; -STOP : S T O P ; -STREAM : S T R E A M ; -STREAMS : S T R E A M S ; TO : T O ; -TOPIC : T O P I C ; -TRANSFORM : T R A N S F O R M ; USER : U S E R ; USERS : U S E R S ; diff --git a/src/query/frontend/semantic/required_privileges.cpp b/src/query/frontend/semantic/required_privileges.cpp index bb750c5a6..e090e697d 100644 --- a/src/query/frontend/semantic/required_privileges.cpp +++ b/src/query/frontend/semantic/required_privileges.cpp @@ -18,10 +18,6 @@ class PrivilegeExtractor : public QueryVisitor, void Visit(AuthQuery &) override { AddPrivilege(AuthQuery::Privilege::AUTH); } - void Visit(StreamQuery &) override { - AddPrivilege(AuthQuery::Privilege::STREAM); - } - void Visit(ExplainQuery &query) override { query.cypher_query_->Accept(*this); } diff --git a/src/query/frontend/stripped_lexer_constants.hpp b/src/query/frontend/stripped_lexer_constants.hpp index de477051f..7d99ca375 100644 --- a/src/query/frontend/stripped_lexer_constants.hpp +++ b/src/query/frontend/stripped_lexer_constants.hpp @@ -88,9 +88,8 @@ const trie::Trie kKeywords = { "when", "then", "else", "end", "count", "filter", "extract", "any", "none", "single", "true", "false", "reduce", "coalesce", "user", "password", "alter", "drop", - "stream", "streams", "load", "data", "kafka", "transform", - "batch", "interval", "show", "start", "stats", "stop", - "size", "topic", "test", "unique", "explain", "profile", + "show", "stats", + "unique", "explain", "profile", "storage", "index", "info", "exists", "assert", "constraint", "node", "key", "dump", "database"}; diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp index 3315c406c..df3c525a8 100644 --- a/src/query/interpreter.cpp +++ b/src/query/interpreter.cpp @@ -10,8 +10,6 @@ #endif #include "glue/auth.hpp" #include "glue/communication.hpp" -#include "integrations/kafka/exceptions.hpp" -#include "integrations/kafka/streams.hpp" #include "query/exceptions.hpp" #include "query/frontend/ast/cypher_main_visitor.hpp" #include "query/frontend/opencypher/parser.hpp" @@ -507,170 +505,6 @@ Callback HandleAuthQuery(AuthQuery *auth_query, auth::Auth *auth, } } -Callback HandleStreamQuery(StreamQuery *stream_query, - integrations::kafka::Streams *streams, - const Parameters ¶meters, - DbAccessor *db_accessor) { - // Empty frame and symbol table for evaluation of expressions. This is OK - // since all expressions should be literals or parameter lookups. - Frame frame(0); - SymbolTable symbol_table; - EvaluationContext evaluation_context; - // TODO: MemoryResource for EvaluationContext, it should probably be passed as - // the argument to Callback. - evaluation_context.timestamp = - std::chrono::duration_cast( - std::chrono::system_clock::now().time_since_epoch()) - .count(); - evaluation_context.parameters = parameters; - ExpressionEvaluator eval(&frame, symbol_table, evaluation_context, - db_accessor, storage::View::OLD); - - std::string stream_name = stream_query->stream_name_; - auto stream_uri = - EvaluateOptionalExpression(stream_query->stream_uri_, &eval); - auto stream_topic = - EvaluateOptionalExpression(stream_query->stream_topic_, &eval); - auto transform_uri = - EvaluateOptionalExpression(stream_query->transform_uri_, &eval); - auto batch_interval_in_ms = - EvaluateOptionalExpression(stream_query->batch_interval_in_ms_, &eval); - auto batch_size = - EvaluateOptionalExpression(stream_query->batch_size_, &eval); - auto limit_batches = - EvaluateOptionalExpression(stream_query->limit_batches_, &eval); - - Callback callback; - - switch (stream_query->action_) { - case StreamQuery::Action::CREATE_STREAM: - callback.fn = [streams, stream_name, stream_uri, stream_topic, - transform_uri, batch_interval_in_ms, batch_size] { - CHECK(stream_uri.IsString()); - CHECK(stream_topic.IsString()); - CHECK(transform_uri.IsString()); - CHECK(batch_interval_in_ms.IsInt() || batch_interval_in_ms.IsNull()); - CHECK(batch_size.IsInt() || batch_size.IsNull()); - - integrations::kafka::StreamInfo info; - info.stream_name = stream_name; - - info.stream_uri = stream_uri.ValueString(); - info.stream_topic = stream_topic.ValueString(); - info.transform_uri = transform_uri.ValueString(); - info.batch_interval_in_ms = - batch_interval_in_ms.IsInt() - ? std::make_optional(batch_interval_in_ms.ValueInt()) - : std::nullopt; - info.batch_size = batch_size.IsInt() - ? std::make_optional(batch_size.ValueInt()) - : std::nullopt; - - try { - streams->Create(info); - } catch (const integrations::kafka::KafkaStreamException &e) { - throw QueryRuntimeException(e.what()); - } - return std::vector>(); - }; - return callback; - case StreamQuery::Action::DROP_STREAM: - callback.fn = [streams, stream_name] { - try { - streams->Drop(stream_name); - } catch (const integrations::kafka::KafkaStreamException &e) { - throw QueryRuntimeException(e.what()); - } - return std::vector>(); - }; - return callback; - case StreamQuery::Action::SHOW_STREAMS: - callback.header = {"name", "uri", "topic", "transform", "status"}; - callback.fn = [streams] { - std::vector> status; - for (const auto &stream : streams->Show()) { - status.push_back(std::vector{ - TypedValue(stream.stream_name), TypedValue(stream.stream_uri), - TypedValue(stream.stream_topic), TypedValue(stream.transform_uri), - TypedValue(stream.stream_status)}); - } - return status; - }; - return callback; - case StreamQuery::Action::START_STREAM: - callback.fn = [streams, stream_name, limit_batches] { - CHECK(limit_batches.IsInt() || limit_batches.IsNull()); - - try { - streams->Start(stream_name, - limit_batches.IsInt() - ? std::make_optional(limit_batches.ValueInt()) - : std::nullopt); - } catch (integrations::kafka::KafkaStreamException &e) { - throw QueryRuntimeException(e.what()); - } - return std::vector>(); - }; - return callback; - case StreamQuery::Action::STOP_STREAM: - callback.fn = [streams, stream_name] { - try { - streams->Stop(stream_name); - } catch (integrations::kafka::KafkaStreamException &e) { - throw QueryRuntimeException(e.what()); - } - return std::vector>(); - }; - return callback; - case StreamQuery::Action::START_ALL_STREAMS: - callback.fn = [streams] { - try { - streams->StartAll(); - } catch (integrations::kafka::KafkaStreamException &e) { - throw QueryRuntimeException(e.what()); - } - return std::vector>(); - }; - return callback; - case StreamQuery::Action::STOP_ALL_STREAMS: - callback.fn = [streams] { - try { - streams->StopAll(); - } catch (integrations::kafka::KafkaStreamException &e) { - throw QueryRuntimeException(e.what()); - } - return std::vector>(); - }; - return callback; - case StreamQuery::Action::TEST_STREAM: - callback.header = {"query", "params"}; - callback.fn = [streams, stream_name, limit_batches] { - CHECK(limit_batches.IsInt() || limit_batches.IsNull()); - - std::vector> rows; - try { - auto results = streams->Test( - stream_name, limit_batches.IsInt() - ? std::make_optional(limit_batches.ValueInt()) - : std::nullopt); - for (const auto &result : results) { - std::map params; - for (const auto ¶m : result.second) { - params.emplace(param.first, glue::ToTypedValue(param.second)); - } - - rows.emplace_back(std::vector{TypedValue(result.first), - TypedValue(params)}); - } - } catch (integrations::kafka::KafkaStreamException &e) { - throw QueryRuntimeException(e.what()); - } - return rows; - }; - return callback; - } -} - Callback HandleIndexQuery(IndexQuery *index_query, std::function invalidate_plan_cache, DbAccessor *db_accessor) { @@ -1274,19 +1108,6 @@ Interpreter::Results Interpreter::Prepare( } callback = HandleAuthQuery(auth_query, interpreter_context_->auth, parsed_query.parameters, db_accessor); -#endif - } else if (auto *stream_query = - utils::Downcast(parsed_query.query)) { -#ifdef MG_SINGLE_NODE_HA - throw utils::NotYetImplemented( - "Graph streams are not yet supported in Memgraph HA instance."); -#else - if (in_explicit_transaction_) { - throw StreamClauseInMulticommandTxException(); - } - callback = - HandleStreamQuery(stream_query, interpreter_context_->kafka_streams, - parsed_query.parameters, db_accessor); #endif } else if (auto *info_query = utils::Downcast(parsed_query.query)) { diff --git a/src/query/interpreter.hpp b/src/query/interpreter.hpp index 136bf2bba..849d26a48 100644 --- a/src/query/interpreter.hpp +++ b/src/query/interpreter.hpp @@ -25,10 +25,6 @@ namespace auth { class Auth; } // namespace auth -namespace integrations::kafka { -class Streams; -} // namespace integrations::kafka - namespace query { static constexpr size_t kExecutionMemoryBlockSize = 1U * 1024U * 1024U; @@ -135,7 +131,6 @@ struct InterpreterContext { bool is_tsc_available{utils::CheckAvailableTSC()}; auth::Auth *auth{nullptr}; - integrations::kafka::Streams *kafka_streams{nullptr}; utils::SkipList ast_cache; utils::SkipList plan_cache; diff --git a/tests/feature_benchmark/CMakeLists.txt b/tests/feature_benchmark/CMakeLists.txt index 517a6bedd..3ad8956f6 100644 --- a/tests/feature_benchmark/CMakeLists.txt +++ b/tests/feature_benchmark/CMakeLists.txt @@ -1,5 +1,2 @@ -# kafka test binaries -add_subdirectory(kafka) - # ha test binaries add_subdirectory(ha) diff --git a/tests/feature_benchmark/apollo_runs.yaml b/tests/feature_benchmark/apollo_runs.yaml index c81bad296..cc5c063fd 100644 --- a/tests/feature_benchmark/apollo_runs.yaml +++ b/tests/feature_benchmark/apollo_runs.yaml @@ -1,14 +1,3 @@ -- name: feature_benchmark__kafka - cd: kafka - commands: ./runner.sh - infiles: - - runner.sh # runner script - - transform.py # transform script - - generate.py # dataset generator script - - ../../../build_release/tests/feature_benchmark/kafka/kafka.py # kafka script - - ../../../build_release/tests/feature_benchmark/kafka/benchmark # benchmark binary - enable_network: true - - name: feature_benchmark__ha__read cd: ha/read commands: ./runner.sh diff --git a/tests/feature_benchmark/kafka/CMakeLists.txt b/tests/feature_benchmark/kafka/CMakeLists.txt deleted file mode 100644 index fb4800cd8..000000000 --- a/tests/feature_benchmark/kafka/CMakeLists.txt +++ /dev/null @@ -1,9 +0,0 @@ -set(target_name memgraph__feature_benchmark__kafka) - -set(benchmark_target_name ${target_name}__benchmark) -add_executable(${benchmark_target_name} benchmark.cpp) -set_target_properties(${benchmark_target_name} PROPERTIES OUTPUT_NAME benchmark) -target_link_libraries(${benchmark_target_name} mg-single-node kvstore_lib) - -# Copy kafka.py to the feature integration kafka folder -configure_file(${PROJECT_SOURCE_DIR}/src/integrations/kafka/kafka.py ${CMAKE_CURRENT_BINARY_DIR} COPYONLY) diff --git a/tests/feature_benchmark/kafka/benchmark.cpp b/tests/feature_benchmark/kafka/benchmark.cpp deleted file mode 100644 index e7c44fa4b..000000000 --- a/tests/feature_benchmark/kafka/benchmark.cpp +++ /dev/null @@ -1,116 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include "json/json.hpp" - -#include "database/single_node/graph_db.hpp" -#include "integrations/kafka/streams.hpp" -#include "memgraph_init.hpp" -#include "utils/flag_validation.hpp" -#include "utils/timer.hpp" - -using namespace std::literals::chrono_literals; - -DEFINE_int64(import_count, 0, "How many entries should we import."); -DEFINE_int64(timeout, 60, "How many seconds should the benchmark wait."); -DEFINE_string(kafka_uri, "", "Kafka URI."); -DEFINE_string(topic_name, "", "Kafka topic."); -DEFINE_string(transform_uri, "", "Transform script URI."); -DEFINE_string(output_file, "", "Output file where shold the results be."); - -void KafkaBenchmarkMain() { - google::SetUsageMessage("Memgraph kafka benchmark database server"); - - auto durability_directory = std::filesystem::path(FLAGS_durability_directory); - - auth::Auth auth{durability_directory / "auth"}; - - audit::Log audit_log{durability_directory / "audit", - audit::kBufferSizeDefault, - audit::kBufferFlushIntervalMillisDefault}; - - database::GraphDb db; - query::InterpreterContext interpreter_context{&db}; - SessionData session_data{&db, &interpreter_context, &auth, &audit_log}; - - std::atomic query_counter{0}; - std::atomic timeout_reached{false}; - std::atomic benchmark_finished{false}; - - integrations::kafka::Streams kafka_streams{ - std::filesystem::path(FLAGS_durability_directory) / "streams", - [&session_data, &query_counter]( - const std::string &query, - const std::map ¶ms) { - KafkaStreamWriter(session_data, query, params); - query_counter++; - }}; - - interpreter_context.auth = &auth; - interpreter_context.kafka_streams = &kafka_streams; - - std::string stream_name = "benchmark"; - - integrations::kafka::StreamInfo stream_info; - stream_info.stream_name = stream_name; - stream_info.stream_uri = FLAGS_kafka_uri; - stream_info.stream_topic = FLAGS_topic_name; - stream_info.transform_uri = FLAGS_transform_uri; - - kafka_streams.Create(stream_info); - kafka_streams.StartAll(); - - // Kickoff a thread that will timeout after FLAGS_timeout seconds - std::thread timeout_thread_ = - std::thread([&timeout_reached, &benchmark_finished]() { - utils::ThreadSetName("BenchTimeout"); - for (int64_t i = 0; i < FLAGS_timeout; ++i) { - std::this_thread::sleep_for(1s); - if (benchmark_finished.load()) return; - } - - timeout_reached.store(true); - }); - - // Wait for the import to start - while (!timeout_reached.load() && query_counter.load() == 0) { - std::this_thread::sleep_for(1ms); - } - - int64_t query_count_start = query_counter.load(); - utils::Timer timer; - - // Wait for the import to finish - while (!timeout_reached.load() && query_counter.load() < FLAGS_import_count) { - std::this_thread::sleep_for(1ms); - } - - double duration = timer.Elapsed().count(); - kafka_streams.StopAll(); - kafka_streams.Drop(stream_name); - - benchmark_finished.store(true); - if (timeout_thread_.joinable()) timeout_thread_.join(); - - int64_t writes = query_counter.load() - query_count_start; - double write_per_second = writes / duration; - - std::ofstream output(FLAGS_output_file); - output << "duration " << duration << std::endl; - output << "executed_writes " << writes << std::endl; - output << "write_per_second " << write_per_second << std::endl; - output.close(); -} - -int main(int argc, char **argv) { - return WithInit(argc, argv, KafkaBenchmarkMain); -} diff --git a/tests/feature_benchmark/kafka/generate.py b/tests/feature_benchmark/kafka/generate.py deleted file mode 100644 index d70bb59e4..000000000 --- a/tests/feature_benchmark/kafka/generate.py +++ /dev/null @@ -1,44 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- - -''' -Kafka benchmark dataset generator. -''' - - -import random -import sys -from argparse import ArgumentParser - - -def get_edge(): - from_node = random.randint(0, args.nodes) - to_node = random.randint(0, args.nodes) - - while from_node == to_node: - to_node = random.randint(0, args.nodes) - return (from_node, to_node) - - -def parse_args(): - argp = ArgumentParser(description=__doc__) - argp.add_argument("--nodes", type=int, default=100, help="Number of nodes.") - argp.add_argument("--edges", type=int, default=30, help="Number of edges.") - return argp.parse_args() - - -args = parse_args() -edges = set() - -for i in range(args.nodes): - print("%d\n" % i) - -for i in range(args.edges): - edge = get_edge() - while edge in edges: - edge = get_edge() - - edges.add(edge) - print("%d %d\n" % edge) - -sys.exit(0) diff --git a/tests/feature_benchmark/kafka/runner.sh b/tests/feature_benchmark/kafka/runner.sh deleted file mode 100755 index 6fb77ade2..000000000 --- a/tests/feature_benchmark/kafka/runner.sh +++ /dev/null @@ -1,143 +0,0 @@ -#!/bin/bash - -## Helper functions - -function wait_for_server { - port=$1 - while ! nc -z -w 1 127.0.0.1 $port; do - sleep 0.1 - done - sleep 1 -} - -function echo_info { printf "\033[1;36m~~ $1 ~~\033[0m\n"; } -function echo_success { printf "\033[1;32m~~ $1 ~~\033[0m\n\n"; } -function echo_failure { printf "\033[1;31m~~ $1 ~~\033[0m\n\n"; } - - -## Environment setup - -# Get script location. -DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" -cd "$DIR" - -# Create a temporary directory. -tmpdir=/tmp/memgraph_benchmark_kafka -if [ -d $tmpdir ]; then - rm -rf $tmpdir -fi -mkdir -p $tmpdir -cd $tmpdir - -# Download the kafka binaries. -kafka="kafka_2.11-2.0.0" -wget -nv http://deps.memgraph.io/$kafka.tgz -tar -xf $kafka.tgz -mv $kafka kafka - -# Find memgraph binaries. -binary_dir="$DIR/../../../build" -if [ ! -d $binary_dir ]; then - binary_dir="$DIR/../../../build_release" -fi - -# Cleanup old kafka logs. -if [ -d /tmp/kafka-logs ]; then - rm -rf /tmp/kafka-logs -fi -if [ -d /tmp/zookeeper ]; then - rm -rf /tmp/zookeeper -fi - -# Results for apollo -RESULTS="$DIR/.apollo_measurements" - -# Benchmark parameters -NODES=100000 -EDGES=10000 - - -## Startup - -# Start the zookeeper process and wait for it to start. -echo_info "Starting zookeeper" -./kafka/bin/zookeeper-server-start.sh -daemon kafka/config/zookeeper.properties -wait_for_server 2181 -echo_success "Started zookeeper" - -# Start the kafka process and wait for it to start. -echo_info "Starting kafka" -./kafka/bin/kafka-server-start.sh -daemon kafka/config/server.properties -wait_for_server 9092 -echo_success "Started kafka" - -# Create the kafka topic. -echo_info "Creating kafka topic test" -./kafka/bin/kafka-topics.sh --create \ - --zookeeper 127.0.0.1:2181 \ - --replication-factor 1 \ - --partitions 1 \ - --topic test -echo_success "Created kafka topic test" - -# Start a http server to serve the transform script. -echo_info "Starting Python HTTP server" -mkdir serve -cd serve -cp "$DIR/transform.py" transform.py -python3 -m http.server & -http_server_pid=$! -wait_for_server 8000 -cd .. -echo_success "Started Python HTTP server" - - -# Start the memgraph process and wait for it to start. -echo_info "Starting kafka benchmark" -$binary_dir/tests/feature_benchmark/kafka/benchmark \ - --import-count=$(($NODES + $EDGES)) \ - --timeout=60 \ - --kafka-uri="127.0.0.1:9092" \ - --topic-name="test" \ - --transform-uri="127.0.0.1:8000/transform.py" \ - --output-file=$RESULTS & -pid=$! - -# Allow benchmark to initialize -sleep 5 - -echo_info "Generating kafka messages" -python3 "$DIR/generate.py" --nodes $NODES --edges $EDGES | \ - ./kafka/bin/kafka-console-producer.sh \ - --broker-list 127.0.0.1:9092 \ - --topic test \ - > /dev/null -echo_success "Finished generating kafka messages" - -wait -n $pid -code=$? - -if [ $code -eq 0 ]; then - echo_success "Benchmark finished successfully" -else - echo_failure "Benchmark didn't finish successfully" -fi - -## Cleanup - -echo_info "Starting test cleanup" - -# Shutdown the http server. -kill $http_server_pid -wait -n - -# Shutdown the kafka process. -./kafka/bin/kafka-server-stop.sh - -# Shutdown the zookeeper process. -./kafka/bin/zookeeper-server-stop.sh - -echo_success "Test cleanup done" - -[ $code -ne 0 ] && exit $code -exit 0 diff --git a/tests/feature_benchmark/kafka/transform.py b/tests/feature_benchmark/kafka/transform.py deleted file mode 100644 index 1fd87ca9b..000000000 --- a/tests/feature_benchmark/kafka/transform.py +++ /dev/null @@ -1,15 +0,0 @@ -index_done = False - -def stream(batch): - global index_done - ret = [] - if not index_done: - ret.append(("CREATE INDEX ON :node(num)", {})) - index_done = True - for item in batch: - message = item.decode("utf-8").strip().split() - if len(message) == 1: - ret.append(("CREATE (:node {num: $num})", {"num": message[0]})) - elif len(message) == 2: - ret.append(("MATCH (n:node {num: $num1}), (m:node {num: $num2}) CREATE (n)-[:et]->(m)", {"num1": message[0], "num2": message[1]})) - return ret diff --git a/tests/integration/CMakeLists.txt b/tests/integration/CMakeLists.txt index 0df33467e..94466f2e8 100644 --- a/tests/integration/CMakeLists.txt +++ b/tests/integration/CMakeLists.txt @@ -7,9 +7,6 @@ add_subdirectory(ssl) # transactions test binaries add_subdirectory(transactions) -# kafka test binaries -add_subdirectory(kafka) - # auth test binaries add_subdirectory(auth) diff --git a/tests/integration/apollo_runs.yaml b/tests/integration/apollo_runs.yaml index f197532dc..721429c13 100644 --- a/tests/integration/apollo_runs.yaml +++ b/tests/integration/apollo_runs.yaml @@ -23,17 +23,6 @@ - ../../../build_debug/memgraph # memgraph binary - ../../../build_debug/tests/integration/transactions/tester # tester binary -- name: integration__kafka - cd: kafka - commands: ./runner.sh - infiles: - - runner.sh # runner script - - transform.py # transform script - - ../../../build_debug/memgraph # memgraph binary - - ../../../build_debug/kafka.py # kafka script - - ../../../build_debug/tests/integration/kafka/tester # tester binary - enable_network: true - - name: integration__auth cd: auth commands: TIMEOUT=820 ./runner.py diff --git a/tests/integration/auth/runner.py b/tests/integration/auth/runner.py index 6a22b60bc..bf3dce743 100755 --- a/tests/integration/auth/runner.py +++ b/tests/integration/auth/runner.py @@ -141,41 +141,6 @@ QUERIES = [ "SHOW USERS FOR test_role", ("AUTH",) ), - - # STREAM - ( - "CREATE STREAM strim AS LOAD DATA KAFKA '127.0.0.1:9092' WITH TOPIC " - "'test' WITH TRANSFORM 'http://127.0.0.1/transform.py'", - ("STREAM",) - ), - ( - "DROP STREAM strim", - ("STREAM",) - ), - ( - "SHOW STREAMS", - ("STREAM",) - ), - ( - "START STREAM strim", - ("STREAM",) - ), - ( - "STOP STREAM strim", - ("STREAM",) - ), - ( - "START ALL STREAMS", - ("STREAM",) - ), - ( - "STOP ALL STREAMS", - ("STREAM",) - ), - ( - "TEST STREAM strim", - ("STREAM",) - ), ] UNAUTHORIZED_ERROR = "You are not authorized to execute this query! Please " \ diff --git a/tests/integration/kafka/CMakeLists.txt b/tests/integration/kafka/CMakeLists.txt deleted file mode 100644 index 74781a6a4..000000000 --- a/tests/integration/kafka/CMakeLists.txt +++ /dev/null @@ -1,6 +0,0 @@ -set(target_name memgraph__integration__kafka) -set(tester_target_name ${target_name}__tester) - -add_executable(${tester_target_name} tester.cpp) -set_target_properties(${tester_target_name} PROPERTIES OUTPUT_NAME tester) -target_link_libraries(${tester_target_name} mg-communication) diff --git a/tests/integration/kafka/runner.sh b/tests/integration/kafka/runner.sh deleted file mode 100755 index da900ff8f..000000000 --- a/tests/integration/kafka/runner.sh +++ /dev/null @@ -1,163 +0,0 @@ -#!/bin/bash - -## Helper functions - -function wait_for_server { - port=$1 - while ! nc -z -w 1 127.0.0.1 $port; do - sleep 0.1 - done - sleep 1 -} - -function echo_info { printf "\033[1;36m~~ $1 ~~\033[0m\n"; } -function echo_success { printf "\033[1;32m~~ $1 ~~\033[0m\n\n"; } -function echo_failure { printf "\033[1;31m~~ $1 ~~\033[0m\n\n"; } - - -## Environment setup - -# Get script location. -DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" -cd "$DIR" - -# Create a temporary directory. -tmpdir=/tmp/memgraph_integration_kafka -if [ -d $tmpdir ]; then - rm -rf $tmpdir -fi -mkdir -p $tmpdir -cd $tmpdir - -# Download the kafka binaries. -kafka="kafka_2.11-2.0.0" -wget -nv http://deps.memgraph.io/$kafka.tgz -tar -xf $kafka.tgz -mv $kafka kafka - -# Find memgraph binaries. -binary_dir="$DIR/../../../build" -if [ ! -d $binary_dir ]; then - binary_dir="$DIR/../../../build_debug" -fi - -# Cleanup old kafka logs. -if [ -d /tmp/kafka-logs ]; then - rm -rf /tmp/kafka-logs -fi -if [ -d /tmp/zookeeper ]; then - rm -rf /tmp/zookeeper -fi - - -## Startup - -# Start the zookeeper process and wait for it to start. -echo_info "Starting zookeeper" -./kafka/bin/zookeeper-server-start.sh -daemon kafka/config/zookeeper.properties -wait_for_server 2181 -echo_success "Started zookeeper" - -# Start the kafka process and wait for it to start. -echo_info "Starting kafka" -./kafka/bin/kafka-server-start.sh -daemon kafka/config/server.properties -wait_for_server 9092 -echo_success "Started kafka" - -# Start the memgraph process and wait for it to start. -echo_info "Starting memgraph" -$binary_dir/memgraph & -pid=$! -wait_for_server 7687 -echo_success "Started memgraph" - - -## Run the test - -# Create the kafka topic. -echo_info "Creating kafka topic" -./kafka/bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test -echo_success "Created kafka topic" - -# Start a http server to serve the transform script. -echo_info "Starting Python HTTP server" -mkdir serve -cd serve -cp "$DIR/transform.py" transform.py -python3 -m http.server & -wait_for_server 8000 -http_pid=$! -cd .. -echo_success "Started Python HTTP server" - -# Create and start the stream in memgraph. -echo_info "Defining and starting the stream in memgraph" -$binary_dir/tests/integration/kafka/tester --step start -code1=$? -if [ $code1 -eq 0 ]; then - echo_success "Defined and started the stream in memgraph" -else - echo_failure "Couldn't define and/or start the stream in memgraph" -fi - -# Wait for the streams to start up. -sleep 10 - -# Produce some messages. -echo_info "Producing kafka messages" -./kafka/bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test < -#include - -#include "communication/bolt/client.hpp" -#include "io/network/endpoint.hpp" -#include "io/network/utils.hpp" -#include "utils/timer.hpp" - -DEFINE_string(address, "127.0.0.1", "Server address"); -DEFINE_int32(port, 7687, "Server port"); -DEFINE_string(username, "", "Username for the database"); -DEFINE_string(password, "", "Password for the database"); -DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server."); - -DEFINE_string(step, "", "Step that should be executed on the database."); - -void ExecuteQuery(communication::bolt::Client &client, - const std::string &query) { - try { - client.Execute(query, {}); - } catch (const communication::bolt::ClientQueryException &e) { - LOG(FATAL) << "Couldn't execute query '" << query - << "'! Received exception: " << e.what(); - } -} - -void ExecuteQueryAndCheck(communication::bolt::Client &client, - const std::string &query, int64_t value) { - try { - auto resp = client.Execute(query, {}); - if (resp.records.size() == 0 || resp.records[0].size() == 0) { - LOG(FATAL) << "The query '" << query << "' didn't return records!"; - } - if (resp.records[0][0].ValueInt() != value) { - LOG(FATAL) << "The query '" << query << "' was expected to return " - << value << " but it returned " - << resp.records[0][0].ValueInt() << "!"; - } - } catch (const communication::bolt::ClientQueryException &e) { - LOG(FATAL) << "Couldn't execute query '" << query - << "'! Received exception: " << e.what(); - } -} - -int main(int argc, char **argv) { - gflags::ParseCommandLineFlags(&argc, &argv, true); - google::InitGoogleLogging(argv[0]); - - communication::Init(); - - io::network::Endpoint endpoint(io::network::ResolveHostname(FLAGS_address), - FLAGS_port); - - communication::ClientContext context(FLAGS_use_ssl); - communication::bolt::Client client(&context); - - client.Connect(endpoint, FLAGS_username, FLAGS_password); - - if (FLAGS_step == "start") { - ExecuteQuery(client, - "CREATE STREAM strim AS LOAD DATA KAFKA '127.0.0.1:9092' WITH " - "TOPIC 'test' WITH TRANSFORM " - "'http://127.0.0.1:8000/transform.py'"); - ExecuteQuery(client, "START STREAM strim"); - } else if (FLAGS_step == "verify") { - ExecuteQueryAndCheck(client, - "UNWIND RANGE(1, 4) AS x MATCH (n:node {num: " - "toString(x)}) RETURN count(n)", - 4); - ExecuteQueryAndCheck(client, - "UNWIND [[1, 2], [3, 4], [1, 4]] AS x MATCH (n:node " - "{num: toString(x[0])})-[e:et]-(m:node {num: " - "toString(x[1])}) RETURN count(e)", - 3); - - } else { - LOG(FATAL) << "Unknown step " << FLAGS_step << "!"; - } - - return 0; -} diff --git a/tests/integration/kafka/transform.py b/tests/integration/kafka/transform.py deleted file mode 100644 index d0740f360..000000000 --- a/tests/integration/kafka/transform.py +++ /dev/null @@ -1,15 +0,0 @@ -index_done = False - -def stream(batch): - global index_done - ret = [] - if not index_done: - ret.append(("CREATE INDEX ON :node(num)", {})) - index_done = True - for item in batch: - message = item.decode("utf-8").strip().split() - if len(message) == 1: - ret.append(("MERGE (:node {num: $num})", {"num": message[0]})) - elif len(message) == 2: - ret.append(("MATCH (n:node {num: $num1}), (m:node {num: $num2}) MERGE (n)-[:et]->(m)", {"num1": message[0], "num2": message[1]})) - return ret diff --git a/tests/unit/cypher_main_visitor.cpp b/tests/unit/cypher_main_visitor.cpp index 05940e0f8..34e8e474a 100644 --- a/tests/unit/cypher_main_visitor.cpp +++ b/tests/unit/cypher_main_visitor.cpp @@ -2343,250 +2343,6 @@ TEST_P(CypherMainVisitorTest, ShowUsersForRole) { SyntaxException); } -TEST_P(CypherMainVisitorTest, CreateStream) { - auto check_create_stream = - [this](std::string input, const std::string &stream_name, - const std::string &stream_uri, const std::string &stream_topic, - const std::string &transform_uri, - std::optional batch_interval_in_ms, - std::optional batch_size) { - auto &ast_generator = *GetParam(); - auto *stream_query = - dynamic_cast(ast_generator.ParseQuery(input)); - ASSERT_TRUE(stream_query); - EXPECT_EQ(stream_query->action_, StreamQuery::Action::CREATE_STREAM); - EXPECT_EQ(stream_query->stream_name_, stream_name); - ASSERT_TRUE(stream_query->stream_uri_); - ast_generator.CheckLiteral(stream_query->stream_uri_, - TypedValue(stream_uri)); - ASSERT_TRUE(stream_query->stream_topic_); - ast_generator.CheckLiteral(stream_query->stream_topic_, - TypedValue(stream_topic)); - ASSERT_TRUE(stream_query->transform_uri_); - ast_generator.CheckLiteral(stream_query->transform_uri_, - TypedValue(transform_uri)); - if (batch_interval_in_ms) { - ASSERT_TRUE(stream_query->batch_interval_in_ms_); - ast_generator.CheckLiteral(stream_query->batch_interval_in_ms_, - TypedValue(*batch_interval_in_ms)); - } else { - EXPECT_EQ(stream_query->batch_interval_in_ms_, nullptr); - } - if (batch_size) { - ASSERT_TRUE(stream_query->batch_size_); - ast_generator.CheckLiteral(stream_query->batch_size_, - TypedValue(*batch_size)); - } else { - EXPECT_EQ(stream_query->batch_size_, nullptr); - } - }; - - check_create_stream( - "CREATE STREAM stream AS LOAD DATA KAFKA 'localhost' " - "WITH TOPIC 'tropika' " - "WITH TRANSFORM 'localhost/test.py'", - "stream", "localhost", "tropika", "localhost/test.py", std::nullopt, - std::nullopt); - - check_create_stream( - "CreaTE StreaM stream AS LOad daTA KAFKA 'localhost' " - "WitH TopIC 'tropika' " - "WITH TRAnsFORM 'localhost/test.py' bAtCH inTErvAL 168", - "stream", "localhost", "tropika", "localhost/test.py", 168, std::nullopt); - - check_create_stream( - "CreaTE StreaM stream AS LOad daTA KAFKA 'localhost' " - "WITH TopIC 'tropika' " - "WITH TRAnsFORM 'localhost/test.py' bAtCH SizE 17", - "stream", "localhost", "tropika", "localhost/test.py", std::nullopt, 17); - - check_create_stream( - "CreaTE StreaM stream AS LOad daTA KAFKA 'localhost' " - "WitH TOPic 'tropika' " - "WITH TRAnsFORM 'localhost/test.py' bAtCH inTErvAL 168 Batch SIze 17", - "stream", "localhost", "tropika", "localhost/test.py", 168, 17); - - EXPECT_THROW(check_create_stream( - "CREATE STREAM stream AS LOAD DATA KAFKA 'localhost' " - "WITH TRANSFORM 'localhost/test.py' BATCH INTERVAL 'jedan' ", - "stream", "localhost", "tropika", "localhost/test.py", 168, - std::nullopt), - SyntaxException); - EXPECT_THROW(check_create_stream( - "CREATE STREAM stream AS LOAD DATA KAFKA 'localhost' " - "WITH TOPIC 'tropika' " - "WITH TRANSFORM 'localhost/test.py' BATCH SIZE 'jedan' ", - "stream", "localhost", "tropika", "localhost/test.py", - std::nullopt, 17), - SyntaxException); - EXPECT_THROW(check_create_stream( - "CREATE STREAM 123 AS LOAD DATA KAFKA 'localhost' " - "WITH TOPIC 'tropika' " - "WITH TRANSFORM 'localhost/test.py' BATCH INTERVAL 168 ", - "stream", "localhost", "tropika", "localhost/test.py", 168, - std::nullopt), - SyntaxException); - EXPECT_THROW( - check_create_stream("CREATE STREAM stream AS LOAD DATA KAFKA localhost " - "WITH TOPIC 'tropika' " - "WITH TRANSFORM 'localhost/test.py'", - "stream", "localhost", "tropika", "localhost/test.py", - std::nullopt, std::nullopt), - SyntaxException); - EXPECT_THROW(check_create_stream( - "CREATE STREAM stream AS LOAD DATA KAFKA 'localhost' " - "WITH TOPIC 2" - "WITH TRANSFORM localhost/test.py BATCH INTERVAL 168 ", - "stream", "localhost", "tropika", "localhost/test.py", 168, - std::nullopt), - SyntaxException); - EXPECT_THROW(check_create_stream( - "CREATE STREAM stream AS LOAD DATA KAFKA 'localhost' " - "WITH TOPIC 'tropika'" - "WITH TRANSFORM localhost/test.py BATCH INTERVAL 168 ", - "stream", "localhost", "tropika", "localhost/test.py", 168, - std::nullopt), - SyntaxException); -} - -TEST_P(CypherMainVisitorTest, DropStream) { - auto check_drop_stream = [this](std::string input, - const std::string &stream_name) { - auto &ast_generator = *GetParam(); - auto *stream_query = - dynamic_cast(ast_generator.ParseQuery(input)); - ASSERT_TRUE(stream_query); - EXPECT_EQ(stream_query->action_, StreamQuery::Action::DROP_STREAM); - EXPECT_EQ(stream_query->stream_name_, stream_name); - }; - - check_drop_stream("DRop stREAm stream", "stream"); - check_drop_stream("DRop stREAm strim", "strim"); - - EXPECT_THROW(check_drop_stream("DROp sTREAM", ""), SyntaxException); - - EXPECT_THROW(check_drop_stream("DROP STreAM 123", "123"), SyntaxException); - - EXPECT_THROW(check_drop_stream("DroP STREAM '123'", "123"), SyntaxException); -} - -TEST_P(CypherMainVisitorTest, ShowStreams) { - auto check_show_streams = [this](std::string input) { - auto &ast_generator = *GetParam(); - auto *stream_query = - dynamic_cast(ast_generator.ParseQuery(input)); - ASSERT_TRUE(stream_query); - EXPECT_EQ(stream_query->action_, StreamQuery::Action::SHOW_STREAMS); - }; - - check_show_streams("SHOW STREAMS"); - - EXPECT_THROW(check_show_streams("SHOW STREAMS lololo"), SyntaxException); -} - -TEST_P(CypherMainVisitorTest, StartStopStream) { - auto check_start_stop_stream = [this](std::string input, - const std::string &stream_name, - bool is_start, - std::optional limit_batches) { - auto &ast_generator = *GetParam(); - auto *stream_query = - dynamic_cast(ast_generator.ParseQuery(input)); - ASSERT_TRUE(stream_query); - - EXPECT_EQ(stream_query->stream_name_, stream_name); - EXPECT_EQ(stream_query->action_, is_start - ? StreamQuery::Action::START_STREAM - : StreamQuery::Action::STOP_STREAM); - - if (limit_batches) { - ASSERT_TRUE(is_start); - ASSERT_TRUE(stream_query->limit_batches_); - ast_generator.CheckLiteral(stream_query->limit_batches_, - TypedValue(*limit_batches)); - } else { - EXPECT_EQ(stream_query->limit_batches_, nullptr); - } - }; - - check_start_stop_stream("stARt STreaM STREAM", "STREAM", true, std::nullopt); - check_start_stop_stream("stARt STreaM strim", "strim", true, std::nullopt); - check_start_stop_stream("StARt STreAM strim LimIT 10 BATchES", "strim", true, - 10); - - check_start_stop_stream("StoP StrEAM strim", "strim", false, std::nullopt); - - EXPECT_THROW(check_start_stop_stream("staRT STReaM 'strim'", "strim", true, - std::nullopt), - SyntaxException); - EXPECT_THROW(check_start_stop_stream("sTART STReaM strim LImiT 'dva' BATCheS", - "strim", true, 2), - SyntaxException); - EXPECT_THROW(check_start_stop_stream("StoP STreAM 'strim'", "strim", false, - std::nullopt), - SyntaxException); - EXPECT_THROW(check_start_stop_stream("STOp sTREAM strim LIMit 2 baTCHES", - "strim", false, 2), - SyntaxException); -} - -TEST_P(CypherMainVisitorTest, StartStopAllStreams) { - auto check_start_stop_all_streams = [this](std::string input, bool is_start) { - auto &ast_generator = *GetParam(); - auto *stream_query = - dynamic_cast(ast_generator.ParseQuery(input)); - ASSERT_TRUE(stream_query); - EXPECT_EQ(stream_query->action_, - is_start ? StreamQuery::Action::START_ALL_STREAMS - : StreamQuery::Action::STOP_ALL_STREAMS); - }; - - check_start_stop_all_streams("STarT AlL StreAMs", true); - - check_start_stop_all_streams("StoP aLL STrEAMs", false); - - EXPECT_THROW(check_start_stop_all_streams("StaRT aLL STreAM", true), - SyntaxException); - - EXPECT_THROW(check_start_stop_all_streams("SToP AlL STREaM", false), - SyntaxException); -} - -TEST_P(CypherMainVisitorTest, TestStream) { - auto check_test_stream = [this](std::string input, - const std::string &stream_name, - std::optional limit_batches) { - auto &ast_generator = *GetParam(); - auto *stream_query = - dynamic_cast(ast_generator.ParseQuery(input)); - ASSERT_TRUE(stream_query); - EXPECT_EQ(stream_query->stream_name_, stream_name); - EXPECT_EQ(stream_query->action_, StreamQuery::Action::TEST_STREAM); - - if (limit_batches) { - ASSERT_TRUE(stream_query->limit_batches_); - ast_generator.CheckLiteral(stream_query->limit_batches_, - TypedValue(*limit_batches)); - } else { - EXPECT_EQ(stream_query->limit_batches_, nullptr); - } - }; - - check_test_stream("TesT STreaM strim", "strim", std::nullopt); - check_test_stream("TesT STreaM STREAM", "STREAM", std::nullopt); - check_test_stream("tESt STreAM STREAM LimIT 10 BATchES", "STREAM", 10); - - check_test_stream("Test StrEAM STREAM", "STREAM", std::nullopt); - - EXPECT_THROW(check_test_stream("tEST STReaM 'strim'", "strim", std::nullopt), - SyntaxException); - EXPECT_THROW( - check_test_stream("test STReaM strim LImiT 'dva' BATCheS", "strim", 2), - SyntaxException); - EXPECT_THROW(check_test_stream("test STreAM 'strim'", "strim", std::nullopt), - SyntaxException); -} - TEST_P(CypherMainVisitorTest, TestExplainRegularQuery) { auto &ast_generator = *GetParam(); EXPECT_TRUE(dynamic_cast( @@ -2604,12 +2360,6 @@ TEST_P(CypherMainVisitorTest, TestExplainAuthQuery) { EXPECT_THROW(ast_generator.ParseQuery("EXPLAIN SHOW ROLES"), SyntaxException); } -TEST_P(CypherMainVisitorTest, TestExplainStreamQuery) { - auto &ast_generator = *GetParam(); - EXPECT_THROW(ast_generator.ParseQuery("EXPLAIN SHOW STREAMS"), - SyntaxException); -} - TEST_P(CypherMainVisitorTest, TestProfileRegularQuery) { { auto &ast_generator = *GetParam(); @@ -2639,12 +2389,6 @@ TEST_P(CypherMainVisitorTest, TestProfileAuthQuery) { EXPECT_THROW(ast_generator.ParseQuery("PROFILE SHOW ROLES"), SyntaxException); } -TEST_P(CypherMainVisitorTest, TestProfileStreamQuery) { - auto &ast_generator = *GetParam(); - EXPECT_THROW(ast_generator.ParseQuery("PROFILE SHOW STREAMS"), - SyntaxException); -} - TEST_P(CypherMainVisitorTest, TestShowStorageInfo) { auto &ast_generator = *GetParam(); auto *query = diff --git a/tests/unit/query_common.hpp b/tests/unit/query_common.hpp index 1dea3810a..a6b7c7f36 100644 --- a/tests/unit/query_common.hpp +++ b/tests/unit/query_common.hpp @@ -652,37 +652,3 @@ auto GetMerge(AstStorage &storage, Pattern *pattern, OnMatch on_match, storage.Create((action), (user), (role), (user_or_role), \ password, (privileges)) #define DROP_USER(usernames) storage.Create((usernames)) -#define CREATE_STREAM(stream_name, stream_uri, stream_topic, transform_uri, \ - batch_interval, batch_size) \ - storage.Create( \ - query::StreamQuery::Action::CREATE_STREAM, (stream_name), \ - LITERAL(stream_uri), LITERAL(stream_topic), LITERAL(transform_uri), \ - (batch_interval), (batch_size), nullptr) -#define DROP_STREAM(stream_name) \ - storage.Create(query::StreamQuery::Action::DROP_STREAM, \ - (stream_name), nullptr, nullptr, nullptr, \ - nullptr, nullptr, nullptr) -#define SHOW_STREAMS \ - storage.Create(query::StreamQuery::Action::SHOW_STREAMS, \ - "", nullptr, nullptr, nullptr, nullptr, \ - nullptr, nullptr) -#define START_STREAM(stream_name, limit_batches) \ - storage.Create(query::StreamQuery::Action::START_STREAM, \ - (stream_name), nullptr, nullptr, nullptr, \ - nullptr, nullptr, (limit_batches)) -#define STOP_STREAM(stream_name) \ - storage.Create(query::StreamQuery::Action::STOP_STREAM, \ - (stream_name), nullptr, nullptr, nullptr, \ - nullptr, nullptr, nullptr) -#define START_ALL_STREAMS \ - storage.Create( \ - query::StreamQuery::Action::START_ALL_STREAMS, "", nullptr, nullptr, \ - nullptr, nullptr, nullptr, nullptr) -#define STOP_ALL_STREAMS \ - storage.Create( \ - query::StreamQuery::Action::STOP_ALL_STREAMS, "", nullptr, nullptr, \ - nullptr, nullptr, nullptr, nullptr) -#define TEST_STREAM(stream_name, limit_batches) \ - storage.Create(query::StreamQuery::Action::TEST_STREAM, \ - (stream_name), nullptr, nullptr, nullptr, \ - nullptr, nullptr, (limit_batches)) diff --git a/tests/unit/query_required_privileges.cpp b/tests/unit/query_required_privileges.cpp index 414661b16..83725a37b 100644 --- a/tests/unit/query_required_privileges.cpp +++ b/tests/unit/query_required_privileges.cpp @@ -111,28 +111,6 @@ TEST_F(TestPrivilegeExtractor, AuthQuery) { UnorderedElementsAre(AuthQuery::Privilege::AUTH)); } -TEST_F(TestPrivilegeExtractor, StreamQuery) { - std::string stream_name("kafka"); - std::string stream_uri("localhost:1234"); - std::string stream_topic("tropik"); - std::string transform_uri("localhost:1234/file.py"); - - std::vector stream_queries = { - CREATE_STREAM(stream_name, stream_uri, stream_topic, transform_uri, - nullptr, nullptr), - DROP_STREAM(stream_name), - SHOW_STREAMS, - START_STREAM(stream_name, nullptr), - STOP_STREAM(stream_name), - START_ALL_STREAMS, - STOP_ALL_STREAMS}; - - for (auto *query : stream_queries) { - EXPECT_THAT(GetRequiredPrivileges(query), - UnorderedElementsAre(AuthQuery::Privilege::STREAM)); - } -} - TEST_F(TestPrivilegeExtractor, ShowIndexInfo) { auto *query = storage.Create(); query->info_type_ = InfoQuery::InfoType::INDEX;