Remove Kafka integration implementation and tests

Reviewers: teon.banek

Reviewed By: teon.banek

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D2525
This commit is contained in:
Matej Ferencevic 2019-10-28 11:11:00 +01:00
parent 875a4a8629
commit 42516afce8
46 changed files with 10 additions and 3493 deletions

View File

@ -205,22 +205,6 @@ import_external_library(rocksdb STATIC
-DCMAKE_SKIP_INSTALL_ALL_DEPENDENCY=true
BUILD_COMMAND $(MAKE) rocksdb)
# Setup librdkafka.
import_external_library(librdkafka STATIC
${CMAKE_CURRENT_SOURCE_DIR}/librdkafka/lib/librdkafka.a
${CMAKE_CURRENT_SOURCE_DIR}/librdkafka/include/librdkafka
CMAKE_ARGS -DRDKAFKA_BUILD_STATIC=ON
-DRDKAFKA_BUILD_EXAMPLES=OFF
-DRDKAFKA_BUILD_TESTS=OFF
-DCMAKE_INSTALL_LIBDIR=lib
-DWITH_SSL=ON
# If we want SASL, we need to install it on build machines
-DWITH_SASL=OFF)
import_library(librdkafka++ STATIC
${CMAKE_CURRENT_SOURCE_DIR}/librdkafka/lib/librdkafka++.a
librdkafka-proj)
# Setup libbcrypt
import_external_library(libbcrypt STATIC
${CMAKE_CURRENT_SOURCE_DIR}/libbcrypt/bcrypt.a

View File

@ -136,11 +136,6 @@ sed -i 's/-Wshadow/-Wno-defaulted-function-deleted/' rocksdb/CMakeLists.txt
# remove shared library from install dependencies
sed -i 's/TARGETS ${ROCKSDB_SHARED_LIB}/TARGETS ${ROCKSDB_SHARED_LIB} OPTIONAL/' rocksdb/CMakeLists.txt
# kafka
kafka_tag="c319b4e987d0bc4fe4f01cf91419d90b62061655" # Mar 8, 2018
# git clone https://github.com/edenhill/librdkafka.git
clone git://deps.memgraph.io/librdkafka.git librdkafka $kafka_tag
# mgclient
mgclient_tag="fe94b3631385ef5dbe40a3d8458860dbcc33e6ea" # May 27, 2019
# git clone https://github.com/memgraph/mgclient.git

View File

@ -4,7 +4,6 @@
add_subdirectory(lisp)
add_subdirectory(utils)
add_subdirectory(requests)
add_subdirectory(integrations)
add_subdirectory(io)
add_subdirectory(telemetry)
add_subdirectory(communication)
@ -88,7 +87,7 @@ set(MG_SINGLE_NODE_LIBS stdc++fs Threads::Threads fmt cppitertools
antlr_opencypher_parser_lib dl glog gflags
mg-utils mg-io mg-requests mg-communication)
# These are enterprise subsystems
set(MG_SINGLE_NODE_LIBS ${MG_SINGLE_NODE_LIBS} mg-integrations-kafka mg-auth)
set(MG_SINGLE_NODE_LIBS ${MG_SINGLE_NODE_LIBS} mg-auth)
if (USE_LTALLOC)
list(APPEND MG_SINGLE_NODE_LIBS ltalloc)
@ -146,7 +145,7 @@ set(MG_SINGLE_NODE_V2_LIBS stdc++fs Threads::Threads fmt cppitertools
antlr_opencypher_parser_lib dl glog gflags mg-storage-v2
mg-utils mg-io mg-requests mg-communication)
# These are enterprise subsystems
set(MG_SINGLE_NODE_V2_LIBS ${MG_SINGLE_NODE_V2_LIBS} mg-integrations-kafka mg-auth)
set(MG_SINGLE_NODE_V2_LIBS ${MG_SINGLE_NODE_V2_LIBS} mg-auth)
if (USE_LTALLOC)
list(APPEND MG_SINGLE_NODE_V2_LIBS ltalloc)
@ -230,7 +229,7 @@ add_custom_target(generate_lcp_single_node_ha DEPENDS generate_lcp_common ${gene
set(MG_SINGLE_NODE_HA_LIBS stdc++fs Threads::Threads fmt cppitertools
antlr_opencypher_parser_lib dl glog gflags
mg-utils mg-io mg-integrations-kafka mg-requests mg-communication mg-comm-rpc
mg-utils mg-io mg-requests mg-communication mg-comm-rpc
mg-auth)
if (USE_LTALLOC)

View File

@ -42,8 +42,6 @@ std::string PermissionToString(Permission permission) {
return "DUMP";
case Permission::AUTH:
return "AUTH";
case Permission::STREAM:
return "STREAM";
}
}

View File

@ -21,7 +21,6 @@ enum class Permission : uint64_t {
CONSTRAINT = 0x00000100,
DUMP = 0x00000200,
AUTH = 0x00010000,
STREAM = 0x00020000,
};
// Constant list of all available permissions.
@ -29,7 +28,7 @@ const std::vector<Permission> kPermissionsAll = {
Permission::MATCH, Permission::CREATE, Permission::MERGE,
Permission::DELETE, Permission::SET, Permission::REMOVE,
Permission::INDEX, Permission::STATS, Permission::CONSTRAINT,
Permission::DUMP, Permission::AUTH, Permission::STREAM};
Permission::DUMP, Permission::AUTH};
// Function that converts a permission to its string representation.
std::string PermissionToString(Permission permission);

View File

@ -26,8 +26,6 @@ auth::Permission PrivilegeToPermission(query::AuthQuery::Privilege privilege) {
return auth::Permission::DUMP;
case query::AuthQuery::Privilege::AUTH:
return auth::Permission::AUTH;
case query::AuthQuery::Privilege::STREAM:
return auth::Permission::STREAM;
}
}
}

View File

@ -1,2 +0,0 @@
# kafka integration
add_subdirectory(kafka)

View File

@ -1,21 +0,0 @@
set(integrations_kafka_src_files
consumer.cpp
transform.cpp
streams.cpp)
find_package(Seccomp)
if (NOT SECCOMP_FOUND)
message(FATAL_ERROR "Couldn't find seccomp library!")
endif()
add_library(mg-integrations-kafka STATIC ${integrations_kafka_src_files})
target_link_libraries(mg-integrations-kafka stdc++fs Threads::Threads fmt
glog gflags librdkafka++ librdkafka zlib json)
target_link_libraries(mg-integrations-kafka mg-utils
mg-requests mg-communication)
target_link_libraries(mg-integrations-kafka ${Seccomp_LIBRARIES})
target_include_directories(mg-integrations-kafka SYSTEM PUBLIC ${Seccomp_INCLUDE_DIRS})
# Copy kafka.py to the root of our build directory where memgraph executable should be
configure_file(kafka.py ${CMAKE_BINARY_DIR} COPYONLY)

View File

@ -1,334 +0,0 @@
#include "integrations/kafka/consumer.hpp"
#include <chrono>
#include <thread>
#include "glog/logging.h"
#include "integrations/kafka/exceptions.hpp"
#include "utils/on_scope_exit.hpp"
#include "utils/thread.hpp"
namespace integrations::kafka {
using namespace std::chrono_literals;
constexpr int64_t kDefaultBatchIntervalMillis = 100;
constexpr int64_t kDefaultBatchSize = 1000;
constexpr int64_t kDefaultTestBatchLimit = 1;
void Consumer::event_cb(RdKafka::Event &event) {
switch (event.type()) {
case RdKafka::Event::Type::EVENT_ERROR:
LOG(WARNING) << "[Kafka] stream " << info_.stream_name << " ERROR ("
<< RdKafka::err2str(event.err()) << "): " << event.str();
break;
default:
break;
}
}
Consumer::Consumer(
const StreamInfo &info, const std::string &transform_script_path,
std::function<
void(const std::string &,
const std::map<std::string, communication::bolt::Value> &)>
stream_writer)
: info_(info),
transform_script_path_(transform_script_path),
stream_writer_(stream_writer) {
std::unique_ptr<RdKafka::Conf> conf(
RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL));
std::string error;
if (conf->set("event_cb", this, error) != RdKafka::Conf::CONF_OK) {
throw ConsumerFailedToInitializeException(info_.stream_name, error);
}
if (conf->set("enable.partition.eof", "false", error) !=
RdKafka::Conf::CONF_OK) {
throw ConsumerFailedToInitializeException(info_.stream_name, error);
}
if (conf->set("bootstrap.servers", info_.stream_uri, error) !=
RdKafka::Conf::CONF_OK) {
throw ConsumerFailedToInitializeException(info_.stream_name, error);
}
if (conf->set("group.id", "mg", error) != RdKafka::Conf::CONF_OK) {
throw ConsumerFailedToInitializeException(info_.stream_name, error);
}
consumer_ = std::unique_ptr<RdKafka::KafkaConsumer,
std::function<void(RdKafka::KafkaConsumer *)>>(
RdKafka::KafkaConsumer::create(conf.get(), error),
[this](auto *consumer) {
this->StopConsuming();
consumer->close();
delete consumer;
});
if (!consumer_) {
throw ConsumerFailedToInitializeException(info_.stream_name, error);
}
// Try fetching metadata first and check if topic exists.
RdKafka::ErrorCode err;
RdKafka::Metadata *raw_metadata = nullptr;
err = consumer_->metadata(true, nullptr, &raw_metadata, 1000);
std::unique_ptr<RdKafka::Metadata> metadata(raw_metadata);
if (err != RdKafka::ERR_NO_ERROR) {
throw ConsumerFailedToInitializeException(info_.stream_name,
RdKafka::err2str(err));
}
bool topic_found = false;
for (const auto &topic_metadata : *metadata->topics()) {
if (topic_metadata->topic() == info_.stream_topic) {
topic_found = true;
break;
}
}
if (!topic_found) {
throw TopicNotFoundException(info_.stream_name);
}
err = consumer_->subscribe({info_.stream_topic});
if (err != RdKafka::ERR_NO_ERROR) {
throw ConsumerFailedToInitializeException(info_.stream_name,
RdKafka::err2str(err));
}
}
void Consumer::StopConsuming() {
is_running_.store(false);
if (thread_.joinable()) thread_.join();
// Set limit_batches to nullopt since it's not running anymore.
info_.limit_batches = std::nullopt;
}
void Consumer::StartConsuming(std::optional<int64_t> limit_batches) {
info_.limit_batches = limit_batches;
is_running_.store(true);
thread_ = std::thread([this, limit_batches]() {
utils::ThreadSetName("StreamKafka");
int64_t batch_count = 0;
Transform transform(transform_script_path_);
transform_alive_.store(false);
if (!transform.Start()) {
LOG(WARNING) << "[Kafka] stream " << info_.stream_name
<< " couldn't start the transform script!";
return;
}
transform_alive_.store(true);
while (is_running_) {
auto batch = this->GetBatch();
if (batch.empty()) continue;
DLOG(INFO) << "[Kafka] stream " << info_.stream_name
<< " processing a batch";
// All exceptions that could be possibly thrown by the `Apply` function
// must be handled here because they *will* crash the database if
// uncaught!
// TODO (mferencevic): Figure out what to do with all other exceptions.
try {
transform.Apply(batch, stream_writer_);
} catch (const TransformExecutionException &) {
LOG(WARNING) << "[Kafka] stream " << info_.stream_name
<< " the transform process has died!";
break;
} catch (const utils::BasicException &e) {
LOG(WARNING) << "[Kafka] stream " << info_.stream_name
<< " the transform process received an exception: "
<< e.what();
break;
}
if (limit_batches != std::nullopt) {
if (limit_batches <= ++batch_count) {
is_running_.store(false);
break;
}
}
}
transform_alive_.store(false);
});
}
std::vector<std::unique_ptr<RdKafka::Message>> Consumer::GetBatch() {
std::vector<std::unique_ptr<RdKafka::Message>> batch;
auto start = std::chrono::system_clock::now();
int64_t remaining_timeout_in_ms =
info_.batch_interval_in_ms.value_or(kDefaultBatchIntervalMillis);
int64_t batch_size = info_.batch_size.value_or(kDefaultBatchSize);
batch.reserve(batch_size);
bool run_batch = true;
for (int64_t i = 0; remaining_timeout_in_ms > 0 && i < batch_size; ++i) {
std::unique_ptr<RdKafka::Message> msg(
consumer_->consume(remaining_timeout_in_ms));
switch (msg->err()) {
case RdKafka::ERR__TIMED_OUT:
run_batch = false;
break;
case RdKafka::ERR_NO_ERROR:
batch.emplace_back(std::move(msg));
break;
default:
LOG(WARNING) << "[Kafka] stream " << info_.stream_name
<< " consumer error: " << msg->errstr();
run_batch = false;
is_running_.store(false);
break;
}
if (!run_batch) {
break;
}
auto now = std::chrono::system_clock::now();
auto took =
std::chrono::duration_cast<std::chrono::milliseconds>(now - start);
remaining_timeout_in_ms = remaining_timeout_in_ms - took.count();
start = now;
}
return batch;
}
void Consumer::Start(std::optional<int64_t> limit_batches) {
if (!consumer_) {
throw ConsumerNotAvailableException(info_.stream_name);
}
if (is_running_) {
throw ConsumerRunningException(info_.stream_name);
}
StartConsuming(limit_batches);
}
void Consumer::Stop() {
if (!consumer_) {
throw ConsumerNotAvailableException(info_.stream_name);
}
if (!is_running_) {
throw ConsumerStoppedException(info_.stream_name);
}
StopConsuming();
}
void Consumer::StartIfStopped() {
if (!consumer_) {
throw ConsumerNotAvailableException(info_.stream_name);
}
if (!is_running_) {
StartConsuming(std::nullopt);
}
}
void Consumer::StopIfRunning() {
if (!consumer_) {
throw ConsumerNotAvailableException(info_.stream_name);
}
if (is_running_) {
StopConsuming();
}
}
std::vector<
std::pair<std::string, std::map<std::string, communication::bolt::Value>>>
Consumer::Test(std::optional<int64_t> limit_batches) {
// All exceptions thrown here are handled by the Bolt protocol.
if (!consumer_) {
throw ConsumerNotAvailableException(info_.stream_name);
}
if (is_running_) {
throw ConsumerRunningException(info_.stream_name);
}
Transform transform(transform_script_path_);
int64_t num_of_batches = limit_batches.value_or(kDefaultTestBatchLimit);
std::vector<
std::pair<std::string, std::map<std::string, communication::bolt::Value>>>
results;
is_running_.store(true);
utils::OnScopeExit cleanup([this]() { is_running_.store(false); });
transform_alive_.store(false);
if (!transform.Start()) {
LOG(WARNING) << "[Kafka] stream " << info_.stream_name
<< " couldn't start the transform script!";
throw TransformExecutionException("Couldn't start the transform script!");
}
transform_alive_.store(true);
for (int64_t i = 0; i < num_of_batches; ++i) {
auto batch = GetBatch();
// Exceptions thrown by `Apply` are handled in Bolt.
// Wrap the `TransformExecutionException` into a new exception with a
// message that isn't so specific so the user doesn't get confused.
try {
transform.Apply(
batch,
[&results](
const std::string &query,
const std::map<std::string, communication::bolt::Value> &params) {
results.push_back({query, params});
});
} catch (const TransformExecutionException) {
LOG(WARNING) << "[Kafka] stream " << info_.stream_name
<< " the transform process has died!";
throw TransformExecutionException(
"The transform script contains a runtime error!");
}
}
transform_alive_.store(false);
return results;
}
StreamStatus Consumer::Status() {
StreamStatus ret;
ret.stream_name = info_.stream_name;
ret.stream_uri = info_.stream_uri;
ret.stream_topic = info_.stream_topic;
ret.transform_uri = info_.transform_uri;
if (!is_running_) {
ret.stream_status = "stopped";
} else if (!transform_alive_) {
ret.stream_status = "error";
} else {
ret.stream_status = "running";
}
return ret;
}
StreamInfo Consumer::Info() {
info_.is_running = is_running_;
return info_;
}
} // namespace integrations::kafka

View File

@ -1,144 +0,0 @@
/// @file
#pragma once
#include <atomic>
#include <functional>
#include <memory>
#include <optional>
#include <string>
#include <thread>
#include <vector>
#include "rdkafkacpp.h"
#include "communication/bolt/v1/value.hpp"
#include "integrations/kafka/transform.hpp"
namespace integrations {
namespace kafka {
/// StreamInfo holds all important info about a stream for memgraph.
///
/// The fields inside this struct are used for serialization and
/// deserialization.
struct StreamInfo {
std::string stream_name;
std::string stream_uri;
std::string stream_topic;
std::string transform_uri;
std::optional<int64_t> batch_interval_in_ms;
std::optional<int64_t> batch_size;
std::optional<int64_t> limit_batches;
bool is_running = false;
};
/// StreamStatus holds all important info about a stream for a user.
struct StreamStatus {
std::string stream_name;
std::string stream_uri;
std::string stream_topic;
std::string transform_uri;
std::string stream_status;
};
/// Memgraphs kafka consumer wrapper.
///
/// Class Consumer wraps around librdkafka Consumer so it's easier to use it.
/// It extends RdKafka::EventCb in order to listen to error events.
class Consumer final : public RdKafka::EventCb {
public:
Consumer() = delete;
/// Creates a new consumer with the given parameters.
///
/// @param info necessary info about a stream
/// @param script_path path on the filesystem where the transform script
/// is stored
/// @param stream_writer custom lambda that knows how to write data to the
/// db
//
/// @throws ConsumerFailedToInitializeException if the consumer can't connect
/// to the Kafka endpoint.
Consumer(const StreamInfo &info, const std::string &transform_script_path,
std::function<
void(const std::string &,
const std::map<std::string, communication::bolt::Value> &)>
stream_writer);
Consumer(const Consumer &other) = delete;
Consumer(Consumer &&other) = delete;
Consumer &operator=(const Consumer &other) = delete;
Consumer &operator=(Consumer &&other) = delete;
/// Starts importing data from a stream to the db.
/// This method will start a new thread which does the import.
///
/// @param limit_batches if present, the consumer will only import the given
/// number of batches in the db, and stop afterwards.
///
/// @throws ConsumerNotAvailableException if the consumer isn't initialized
/// @throws ConsumerRunningException if the consumer is already running
void Start(std::optional<int64_t> limit_batches);
/// Stops importing data from a stream to the db.
///
/// @throws ConsumerNotAvailableException if the consumer isn't initialized
/// @throws ConsumerStoppedException if the consumer is already stopped
void Stop();
/// Starts importing importing from a stream only if the stream is stopped.
///
/// @throws ConsumerNotAvailableException if the consumer isn't initialized
void StartIfStopped();
/// Stops importing from a stream only if the stream is running.
///
/// @throws ConsumerNotAvailableException if the consumer isn't initialized
void StopIfRunning();
/// Performs a dry-run on a given stream.
///
/// @param limit_batches the consumer will only test on the given number of
/// batches. If not present, a default value is used.
///
/// @throws ConsumerNotAvailableException if the consumer isn't initialized
/// @throws ConsumerRunningException if the consumer is alredy running.
std::vector<
std::pair<std::string, std::map<std::string, communication::bolt::Value>>>
Test(std::optional<int64_t> limit_batches);
/// Returns the current status of a stream.
StreamStatus Status();
/// Returns the info of a stream.
StreamInfo Info();
private:
StreamInfo info_;
std::string transform_script_path_;
std::function<void(const std::string &,
const std::map<std::string, communication::bolt::Value> &)>
stream_writer_;
std::atomic<bool> is_running_{false};
std::atomic<bool> transform_alive_{false};
std::thread thread_;
std::unique_ptr<RdKafka::KafkaConsumer,
std::function<void(RdKafka::KafkaConsumer *)>>
consumer_;
void event_cb(RdKafka::Event &event) override;
void StopConsuming();
void StartConsuming(std::optional<int64_t> limit_batches);
std::vector<std::unique_ptr<RdKafka::Message>> GetBatch();
};
} // namespace kafka
} // namespace integrations

View File

@ -1,124 +0,0 @@
#pragma once
#include "utils/exceptions.hpp"
#include <fmt/format.h>
namespace integrations::kafka {
class KafkaStreamException : public utils::BasicException {
using utils::BasicException::BasicException;
};
class StreamExistsException : public KafkaStreamException {
public:
explicit StreamExistsException(const std::string &stream_name)
: KafkaStreamException(
fmt::format("Kafka stream {} already exists.", stream_name)) {}
};
class StreamDoesntExistException : public KafkaStreamException {
public:
explicit StreamDoesntExistException(const std::string &stream_name)
: KafkaStreamException(
fmt::format("Kafka stream {} doesn't exist.", stream_name)) {}
};
class StreamSerializationException : public KafkaStreamException {
public:
StreamSerializationException()
: KafkaStreamException("Failed to serialize stream data!") {}
};
class StreamDeserializationException : public KafkaStreamException {
public:
StreamDeserializationException()
: KafkaStreamException("Failed to deserialize stream data!") {}
};
class StreamMetadataCouldNotBeStored : public KafkaStreamException {
public:
explicit StreamMetadataCouldNotBeStored(const std::string &stream_name)
: KafkaStreamException(fmt::format(
"Couldn't persist stream metadata for stream {}", stream_name)) {}
};
class StreamMetadataCouldNotBeDeleted : public KafkaStreamException {
public:
explicit StreamMetadataCouldNotBeDeleted(const std::string &stream_name)
: KafkaStreamException(fmt::format(
"Couldn't delete persisted stream metadata for stream {}",
stream_name)) {}
};
class ConsumerFailedToInitializeException : public KafkaStreamException {
public:
ConsumerFailedToInitializeException(const std::string &stream_name,
const std::string &error)
: KafkaStreamException(fmt::format(
"Failed to initialize kafka stream {} : {}", stream_name, error)) {}
};
class ConsumerNotAvailableException : public KafkaStreamException {
public:
explicit ConsumerNotAvailableException(const std::string &stream_name)
: KafkaStreamException(
fmt::format("Kafka stream {} not available", stream_name)) {}
};
class ConsumerRunningException : public KafkaStreamException {
public:
explicit ConsumerRunningException(const std::string &stream_name)
: KafkaStreamException(
fmt::format("Kafka stream {} is already running", stream_name)) {}
};
class ConsumerStoppedException : public KafkaStreamException {
public:
explicit ConsumerStoppedException(const std::string &stream_name)
: KafkaStreamException(
fmt::format("Kafka stream {} is already stopped", stream_name)) {}
};
class TopicNotFoundException : public KafkaStreamException {
public:
explicit TopicNotFoundException(const std::string &stream_name)
: KafkaStreamException(
fmt::format("Kafka stream {}, topic not found", stream_name)) {}
};
class TransformScriptNotFoundException : public KafkaStreamException {
public:
explicit TransformScriptNotFoundException(const std::string &stream_name)
: KafkaStreamException(fmt::format(
"Couldn't find transform script for {}", stream_name)) {}
};
class TransformScriptDownloadException : public KafkaStreamException {
public:
explicit TransformScriptDownloadException(const std::string &transform_uri)
: KafkaStreamException(fmt::format(
"Couldn't get the transform script from {}", transform_uri)) {}
};
class TransformScriptCouldNotBeCreatedException : public KafkaStreamException {
public:
explicit TransformScriptCouldNotBeCreatedException(
const std::string &stream_name)
: KafkaStreamException(fmt::format(
"Couldn't create transform script for stream {}", stream_name)) {}
};
class TransformScriptCouldNotBeDeletedException : public KafkaStreamException {
public:
explicit TransformScriptCouldNotBeDeletedException(
const std::string &stream_name)
: KafkaStreamException(fmt::format(
"Couldn't delete transform script for stream {}", stream_name)) {}
};
class TransformExecutionException : public KafkaStreamException {
using KafkaStreamException::KafkaStreamException;
};
} // namespace integrations::kafka

View File

@ -1,162 +0,0 @@
#!/usr/bin/python3
import os
import struct
# Import the target transform script.
import transform
# Constants used for communicating with the memgraph process.
COMMUNICATION_TO_PYTHON_FD = 1000
COMMUNICATION_FROM_PYTHON_FD = 1002
# Functions used to get data from the memgraph process.
def get_data(num_bytes):
data = bytes()
while len(data) < num_bytes:
data += os.read(COMMUNICATION_TO_PYTHON_FD, num_bytes - len(data))
return data
def get_size():
fmt = "I" # uint32_t
data = get_data(struct.calcsize(fmt))
return struct.unpack(fmt, data)[0]
def get_batch():
batch = []
count = get_size()
for i in range(count):
size = get_size()
batch.append(get_data(size))
return batch
# Functions used to put data to the memgraph process.
TYPE_NONE = 0x10
TYPE_BOOL_FALSE = 0x20
TYPE_BOOL_TRUE = 0x21
TYPE_INT = 0x30
TYPE_FLOAT = 0x40
TYPE_STR = 0x50
TYPE_LIST = 0x60
TYPE_DICT = 0x70
def put_data(data):
written = 0
while written < len(data):
written += os.write(COMMUNICATION_FROM_PYTHON_FD, data[written:])
def put_size(size):
fmt = "I" # uint32_t
put_data(struct.pack(fmt, size))
def put_type(typ):
fmt = "B" # uint8_t
put_data(struct.pack(fmt, typ))
def put_string(value):
data = value.encode("utf-8")
put_size(len(data))
put_data(data)
def put_value(value, ids):
if value is None:
put_type(TYPE_NONE)
elif type(value) is bool:
if value:
put_type(TYPE_BOOL_TRUE)
else:
put_type(TYPE_BOOL_FALSE)
elif type(value) is int:
put_type(TYPE_INT)
put_data(struct.pack("q", value)) # int64_t
elif type(value) is float:
put_type(TYPE_FLOAT)
put_data(struct.pack("d", value)) # double
elif type(value) is str:
put_type(TYPE_STR)
put_string(value)
elif type(value) is list:
if id(value) in ids:
raise ValueError("Recursive objects are not supported!")
ids_new = ids + [id(value)]
put_type(TYPE_LIST)
put_size(len(value))
for item in value:
put_value(item, ids_new)
elif type(value) is dict:
if id(value) in ids:
raise ValueError("Recursive objects are not supported!")
ids_new = ids + [id(value)]
put_type(TYPE_DICT)
put_size(len(value))
for key, item in value.items():
if type(key) is not str:
raise TypeError("Dictionary keys must be strings!")
put_string(key)
put_value(item, ids_new)
else:
raise TypeError("Unsupported value type {}!".format(str(type(value))))
# Functions used to continuously process data.
def put_params(params):
if type(params) != dict:
raise TypeError("Parameters must be a dict!")
put_value(params, [])
class StreamError(Exception):
pass
def process_batch():
# Get the data that should be transformed.
batch = get_batch()
# Transform the data.
ret = transform.stream(batch)
# Sanity checks for the transformed data.
if type(ret) != list:
raise StreamError("The transformed items must be a list!")
for item in ret:
if type(item) not in [list, tuple]:
raise StreamError("The transformed item must be a tuple "
"or a list!")
if len(item) != 2:
raise StreamError("There must be exactly two elements in the "
"transformed item!")
if type(item[0]) != str:
raise StreamError("The first transformed element of an item "
"must be a string!")
if type(item[1]) != dict:
raise StreamError("The second transformed element of an item "
"must be a dictionary!")
# Send the items to the server.
put_size(len(ret))
for query, params in ret:
put_string(query)
put_params(params)
# Main entry point.
if __name__ == "__main__":
while True:
process_batch()

View File

@ -1,274 +0,0 @@
#include "integrations/kafka/streams.hpp"
#include <cstdio>
#include <filesystem>
#include <optional>
#include <json/json.hpp>
#include "integrations/kafka/exceptions.hpp"
#include "requests/requests.hpp"
#include "utils/file.hpp"
namespace integrations::kafka {
namespace fs = std::filesystem;
const std::string kMetadataDir = "metadata";
const std::string kTransformDir = "transform";
const std::string kTransformExt = ".py";
namespace {
nlohmann::json Serialize(const StreamInfo &info) {
nlohmann::json data = nlohmann::json::object();
data["stream_name"] = info.stream_name;
data["stream_uri"] = info.stream_uri;
data["stream_topic"] = info.stream_topic;
data["transform_uri"] = info.transform_uri;
if (info.batch_interval_in_ms) {
data["batch_interval_in_ms"] = info.batch_interval_in_ms.value();
} else {
data["batch_interval_in_ms"] = nullptr;
}
if (info.batch_size) {
data["batch_size"] = info.batch_size.value();
} else {
data["batch_size"] = nullptr;
}
if (info.limit_batches) {
data["limit_batches"] = info.limit_batches.value();
} else {
data["limit_batches"] = nullptr;
}
data["is_running"] = info.is_running;
return data;
}
StreamInfo Deserialize(const nlohmann::json &data) {
if (!data.is_object()) throw StreamDeserializationException();
StreamInfo info;
if (!data["stream_name"].is_string()) throw StreamDeserializationException();
info.stream_name = data["stream_name"];
if (!data["stream_uri"].is_string()) throw StreamDeserializationException();
info.stream_uri = data["stream_uri"];
if (!data["stream_topic"].is_string()) throw StreamDeserializationException();
info.stream_topic = data["stream_topic"];
if (!data["transform_uri"].is_string())
throw StreamDeserializationException();
info.transform_uri = data["transform_uri"];
if (data["batch_interval_in_ms"].is_number()) {
info.batch_interval_in_ms = data["batch_interval_in_ms"];
} else if (data["batch_interval_in_ms"].is_null()) {
info.batch_interval_in_ms = std::nullopt;
} else {
throw StreamDeserializationException();
}
if (data["batch_size"].is_number()) {
info.batch_size = data["batch_size"];
} else if (data["batch_size"].is_null()) {
info.batch_size = std::nullopt;
} else {
throw StreamDeserializationException();
}
if (!data["is_running"].is_boolean()) throw StreamDeserializationException();
info.is_running = data["is_running"];
if (data["limit_batches"].is_number()) {
info.limit_batches = data["limit_batches"];
} else if (data["limit_batches"].is_null()) {
info.limit_batches = std::nullopt;
} else {
throw StreamDeserializationException();
}
return info;
}
} // namespace
Streams::Streams(const std::string &streams_directory,
std::function<void(
const std::string &,
const std::map<std::string, communication::bolt::Value> &)>
stream_writer)
: streams_directory_(streams_directory),
stream_writer_(stream_writer),
metadata_store_(fs::path(streams_directory) / kMetadataDir) {}
void Streams::Recover() {
for (auto it = metadata_store_.begin(); it != metadata_store_.end(); ++it) {
// Check if the transform script also exists;
auto transform_script = GetTransformScriptPath(it->first);
if (!fs::exists(transform_script))
throw TransformScriptNotFoundException(it->first);
nlohmann::json data;
try {
data = nlohmann::json::parse(it->second);
} catch (const nlohmann::json::parse_error &e) {
throw StreamDeserializationException();
}
StreamInfo info = Deserialize(data);
Create(info, false);
if (info.is_running) Start(info.stream_name, info.limit_batches);
}
}
void Streams::Create(const StreamInfo &info, bool download_transform_script) {
std::lock_guard<std::mutex> g(mutex_);
if (consumers_.find(info.stream_name) != consumers_.end())
throw StreamExistsException(info.stream_name);
// Make sure transform directory exists or we can create it.
if (!utils::EnsureDir(GetTransformScriptDir())) {
throw TransformScriptCouldNotBeCreatedException(info.stream_name);
}
// Download the transform script.
auto transform_script_path = GetTransformScriptPath(info.stream_name);
if (download_transform_script &&
!requests::CreateAndDownloadFile(info.transform_uri,
transform_script_path)) {
throw TransformScriptDownloadException(info.transform_uri);
}
// Store stream_info in metadata_store_.
if (!metadata_store_.Put(info.stream_name, Serialize(info).dump())) {
throw StreamMetadataCouldNotBeStored(info.stream_name);
}
try {
consumers_.emplace(
std::piecewise_construct, std::forward_as_tuple(info.stream_name),
std::forward_as_tuple(info, transform_script_path, stream_writer_));
} catch (const KafkaStreamException &e) {
// If we failed to create the consumer, remove the persisted metadata.
metadata_store_.Delete(info.stream_name);
// Rethrow the exception.
throw;
}
}
void Streams::Drop(const std::string &stream_name) {
std::lock_guard<std::mutex> g(mutex_);
auto find_it = consumers_.find(stream_name);
if (find_it == consumers_.end())
throw StreamDoesntExistException(stream_name);
// Erase and implicitly stop the consumer.
consumers_.erase(find_it);
// Remove stream_info in metadata_store_.
if (!metadata_store_.Delete(stream_name)) {
throw StreamMetadataCouldNotBeDeleted(stream_name);
}
// Remove transform script.
if (std::remove(GetTransformScriptPath(stream_name).c_str())) {
throw TransformScriptNotFoundException(stream_name);
}
}
void Streams::Start(const std::string &stream_name,
std::optional<int64_t> limit_batches) {
std::lock_guard<std::mutex> g(mutex_);
auto find_it = consumers_.find(stream_name);
if (find_it == consumers_.end())
throw StreamDoesntExistException(stream_name);
find_it->second.Start(limit_batches);
// Store stream_info in metadata_store_.
if (!metadata_store_.Put(stream_name,
Serialize(find_it->second.Info()).dump())) {
throw StreamMetadataCouldNotBeStored(stream_name);
}
}
void Streams::Stop(const std::string &stream_name) {
std::lock_guard<std::mutex> g(mutex_);
auto find_it = consumers_.find(stream_name);
if (find_it == consumers_.end())
throw StreamDoesntExistException(stream_name);
find_it->second.Stop();
// Store stream_info in metadata_store_.
if (!metadata_store_.Put(stream_name,
Serialize(find_it->second.Info()).dump())) {
throw StreamMetadataCouldNotBeStored(stream_name);
}
}
void Streams::StartAll() {
std::lock_guard<std::mutex> g(mutex_);
for (auto &consumer_kv : consumers_) {
consumer_kv.second.StartIfStopped();
// Store stream_info in metadata_store_.
if (!metadata_store_.Put(consumer_kv.first,
Serialize(consumer_kv.second.Info()).dump())) {
throw StreamMetadataCouldNotBeStored(consumer_kv.first);
}
}
}
void Streams::StopAll() {
std::lock_guard<std::mutex> g(mutex_);
for (auto &consumer_kv : consumers_) {
consumer_kv.second.StopIfRunning();
// Store stream_info in metadata_store_.
if (!metadata_store_.Put(consumer_kv.first,
Serialize(consumer_kv.second.Info()).dump())) {
throw StreamMetadataCouldNotBeStored(consumer_kv.first);
}
}
}
std::vector<StreamStatus> Streams::Show() {
std::vector<StreamStatus> streams;
std::lock_guard<std::mutex> g(mutex_);
for (auto &consumer_kv : consumers_) {
streams.emplace_back(consumer_kv.second.Status());
}
return streams;
}
std::vector<
std::pair<std::string, std::map<std::string, communication::bolt::Value>>>
Streams::Test(const std::string &stream_name,
std::optional<int64_t> limit_batches) {
std::lock_guard<std::mutex> g(mutex_);
auto find_it = consumers_.find(stream_name);
if (find_it == consumers_.end())
throw StreamDoesntExistException(stream_name);
return find_it->second.Test(limit_batches);
}
std::string Streams::GetTransformScriptDir() {
return fs::path(streams_directory_) / kTransformDir;
}
std::string Streams::GetTransformScriptPath(const std::string &stream_name) {
return fs::path(GetTransformScriptDir()) / (stream_name + kTransformExt);
}
} // namespace integrations::kafka

View File

@ -1,127 +0,0 @@
/// @file
#pragma once
#include <mutex>
#include <optional>
#include <unordered_map>
#include "integrations/kafka/consumer.hpp"
#include "storage/common/kvstore/kvstore.hpp"
namespace integrations::kafka {
/// Manages kafka consumers.
///
/// This class is responsible for all query supported actions to happen.
class Streams final {
public:
/// Initialize streams.
///
/// @param streams_directory path on the filesystem where the streams metadata
/// will be persisted and where the transform scripts will be
/// downloaded
/// @param stream_writer lambda that knows how to write data to the db
Streams(const std::string &streams_directory,
std::function<
void(const std::string &,
const std::map<std::string, communication::bolt::Value> &)>
stream_writer);
/// Looks for persisted metadata and tries to recover consumers.
///
/// @throws TransformScriptNotFoundException if the transform script is
// missing
/// @throws StreamDeserializationException if the metadata can't be recovered
void Recover();
/// Creates a new import stream.
/// This method makes sure there is no other stream with the same name,
/// downloads the given transform script and writes metadata to persisted
/// store.
///
/// @param info StreamInfo struct with necessary data for a kafka consumer.
/// @param download_transform_script Denote whether or not the transform
/// script should be downloaded.
///
/// @throws StreamExistsException if the stream with the same name exists
/// @throws StreamMetadataCouldNotBeStored if it can't persist metadata
/// @throws TransformScriptCouldNotBeCreatedException if the script could not
/// be created
void Create(const StreamInfo &info, bool download_transform_script = true);
/// Deletes an existing stream and all the data that was persisted.
///
/// @param stream_name name of the stream that needs to be deleted.
///
/// @throws StreamDoesntExistException if the stream doesn't exist
/// @throws StreamMetadataCouldNotBeDeleted if the persisted metadata can't be
/// delteed
/// @throws TransformScriptNotFoundException if the transform script can't be
/// deleted
void Drop(const std::string &stream_name);
/// Start consuming from a stream.
///
/// @param stream_name name of the stream we want to start consuming
/// @param batch_limit number of batches we want to import before stopping
///
/// @throws StreamDoesntExistException if the stream doesn't exist
/// @throws ConsumerRunningException if the consumer is already running
/// @throws StreamMetadataCouldNotBeStored if it can't persist metadata
void Start(const std::string &stream_name,
std::optional<int64_t> batch_limit = std::nullopt);
/// Stop consuming from a stream.
///
/// @param stream_name name of the stream we wanto to stop consuming
///
/// @throws StreamDoesntExistException if the stream doesn't exist
/// @throws ConsumerStoppedException if the consumer is already stopped
/// @throws StreamMetadataCouldNotBeStored if it can't persist metadata
void Stop(const std::string &stream_name);
/// Start consuming from all streams that are stopped.
///
/// @throws StreamMetadataCouldNotBeStored if it can't persist metadata
void StartAll();
/// Stop consuming from all streams that are running.
///
/// @throws StreamMetadataCouldNotBeStored if it can't persist metadata
void StopAll();
/// Return current status for all streams.
std::vector<StreamStatus> Show();
/// Do a dry-run consume from a stream.
///
/// @param stream_name name of the stream we want to test
/// @param batch_limit number of batches we want to test before stopping
///
/// @returns A vector of pairs consisting of the query (std::string) and its
/// parameters (std::map<std::string, communication::bolt::Value).
///
/// @throws StreamDoesntExistException if the stream doesn't exist
std::vector<
std::pair<std::string, std::map<std::string, communication::bolt::Value>>>
Test(const std::string &stream_name,
std::optional<int64_t> batch_limit = std::nullopt);
private:
std::string streams_directory_;
/// Custom lambda that "knows" how to execute queries.
std::function<void(const std::string &,
const std::map<std::string, communication::bolt::Value> &)>
stream_writer_;
/// Key value storage used as a persistent storage for stream metadata.
storage::KVStore metadata_store_;
std::mutex mutex_;
std::unordered_map<std::string, Consumer> consumers_;
std::string GetTransformScriptDir();
std::string GetTransformScriptPath(const std::string &stream_name);
};
} // namespace integrations::kafka

View File

@ -1,654 +0,0 @@
#include "integrations/kafka/transform.hpp"
#include <chrono>
#include <cstdlib>
#include <cstring>
#include <thread>
#include <errno.h>
#include <fcntl.h>
#include <libgen.h>
#include <linux/limits.h>
#include <pwd.h>
#include <sched.h>
#include <seccomp.h>
#include <signal.h>
#include <sys/resource.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include <fmt/format.h>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "communication/bolt/v1/value.hpp"
#include "integrations/kafka/exceptions.hpp"
#include "utils/file.hpp"
DEFINE_string(python_interpreter, "/usr/bin/python3",
"Path to the Python 3.x interpreter that should be used");
namespace {
///////////////////////
// Namespace shortcuts.
///////////////////////
using communication::bolt::Value;
using integrations::kafka::TargetArguments;
using integrations::kafka::TransformExecutionException;
namespace fs = std::filesystem;
/////////////////////////////////////////////////////////////////////////
// Constants used for starting and communicating with the target process.
/////////////////////////////////////////////////////////////////////////
const int kPipeReadEnd = 0;
const int kPipeWriteEnd = 1;
const int kCommunicationToPythonFd = 1000;
const int kCommunicationFromPythonFd = 1002;
const int kTerminateTimeoutSec = 5;
const std::string kHelperScriptName = "kafka.py";
const std::string kTransformScriptName = "transform.py";
////////////////////
// Helper functions.
////////////////////
fs::path GetTemporaryPath(pid_t pid) {
return fs::temp_directory_path() / "memgraph" /
fmt::format("transform_{}", pid);
}
fs::path GetHelperScriptPath() {
char path[PATH_MAX];
memset(path, 0, PATH_MAX);
auto ret = readlink("/proc/self/exe", path, PATH_MAX);
if (ret < 0) return "";
return fs::path() / std::string(dirname(path)) / kHelperScriptName;
}
std::string GetEnvironmentVariable(const std::string &name) {
char *value = secure_getenv(name.c_str());
if (value == nullptr) return "";
return {value};
}
///////////////////////////////////////////
// char** wrapper used for C library calls.
///////////////////////////////////////////
const int kCharppMaxElements = 20;
class CharPP final {
public:
CharPP() { memset(data_, 0, sizeof(char *) * kCharppMaxElements); }
~CharPP() {
for (size_t i = 0; i < size_; ++i) {
free(data_[i]);
}
}
CharPP(const CharPP &) = delete;
CharPP(CharPP &&) = delete;
CharPP &operator=(const CharPP &) = delete;
CharPP &operator=(CharPP &&) = delete;
void Add(const char *value) {
if (size_ == kCharppMaxElements) return;
int len = strlen(value);
char *item = (char *)malloc(sizeof(char) * (len + 1));
if (item == nullptr) return;
memcpy(item, value, len);
item[len] = 0;
data_[size_++] = item;
}
void Add(const std::string &value) { Add(value.c_str()); }
char **Get() { return data_; }
private:
char *data_[kCharppMaxElements];
size_t size_{0};
};
////////////////////////////////////
// Security functions and constants.
////////////////////////////////////
const std::vector<int> seccomp_syscalls_allowed = {
SCMP_SYS(read),
SCMP_SYS(write),
SCMP_SYS(close),
SCMP_SYS(stat),
SCMP_SYS(fstat),
SCMP_SYS(lstat),
SCMP_SYS(poll),
SCMP_SYS(lseek),
SCMP_SYS(mmap),
SCMP_SYS(mprotect),
SCMP_SYS(munmap),
SCMP_SYS(brk),
SCMP_SYS(rt_sigaction),
SCMP_SYS(rt_sigprocmask),
SCMP_SYS(rt_sigreturn),
SCMP_SYS(ioctl),
SCMP_SYS(pread64),
SCMP_SYS(pwrite64),
SCMP_SYS(readv),
SCMP_SYS(writev),
SCMP_SYS(access),
SCMP_SYS(select),
SCMP_SYS(mremap),
SCMP_SYS(msync),
SCMP_SYS(mincore),
SCMP_SYS(madvise),
SCMP_SYS(dup),
SCMP_SYS(dup2),
SCMP_SYS(pause),
SCMP_SYS(nanosleep),
SCMP_SYS(getpid),
SCMP_SYS(sendfile),
SCMP_SYS(execve),
SCMP_SYS(exit),
SCMP_SYS(uname),
SCMP_SYS(fcntl),
SCMP_SYS(fsync),
SCMP_SYS(fdatasync),
SCMP_SYS(getdents),
SCMP_SYS(getcwd),
SCMP_SYS(readlink),
SCMP_SYS(gettimeofday),
SCMP_SYS(getrlimit),
SCMP_SYS(getrusage),
SCMP_SYS(getuid),
SCMP_SYS(getgid),
SCMP_SYS(geteuid),
SCMP_SYS(getegid),
SCMP_SYS(getppid),
SCMP_SYS(getpgrp),
SCMP_SYS(rt_sigpending),
SCMP_SYS(rt_sigtimedwait),
SCMP_SYS(rt_sigsuspend),
SCMP_SYS(sched_setparam),
SCMP_SYS(mlock),
SCMP_SYS(munlock),
SCMP_SYS(mlockall),
SCMP_SYS(munlockall),
SCMP_SYS(arch_prctl),
SCMP_SYS(ioperm),
SCMP_SYS(time),
SCMP_SYS(futex),
SCMP_SYS(set_tid_address),
SCMP_SYS(clock_gettime),
SCMP_SYS(clock_getres),
SCMP_SYS(clock_nanosleep),
SCMP_SYS(exit_group),
SCMP_SYS(mbind),
SCMP_SYS(set_mempolicy),
SCMP_SYS(get_mempolicy),
SCMP_SYS(migrate_pages),
SCMP_SYS(openat),
SCMP_SYS(pselect6),
SCMP_SYS(ppoll),
SCMP_SYS(set_robust_list),
SCMP_SYS(get_robust_list),
SCMP_SYS(tee),
SCMP_SYS(move_pages),
SCMP_SYS(dup3),
SCMP_SYS(preadv),
SCMP_SYS(pwritev),
SCMP_SYS(getrandom),
SCMP_SYS(sigaltstack),
SCMP_SYS(gettid),
SCMP_SYS(tgkill),
SCMP_SYS(sysinfo),
};
bool SetupSeccomp() {
// Initialize the seccomp context.
scmp_filter_ctx ctx;
ctx = seccomp_init(SCMP_ACT_TRAP);
if (ctx == NULL) return false;
// First we deny access to the `open` system call called with `O_WRONLY`,
// `O_RDWR` and `O_CREAT`.
if (seccomp_rule_add(ctx, SCMP_ACT_KILL, SCMP_SYS(open), 1,
SCMP_A1(SCMP_CMP_MASKED_EQ, O_WRONLY, O_WRONLY)) != 0) {
seccomp_release(ctx);
return false;
}
if (seccomp_rule_add(ctx, SCMP_ACT_KILL, SCMP_SYS(open), 1,
SCMP_A1(SCMP_CMP_MASKED_EQ, O_RDWR, O_RDWR)) != 0) {
seccomp_release(ctx);
return false;
}
if (seccomp_rule_add(ctx, SCMP_ACT_KILL, SCMP_SYS(open), 1,
SCMP_A1(SCMP_CMP_MASKED_EQ, O_CREAT, O_CREAT)) != 0) {
seccomp_release(ctx);
return false;
}
// Now we allow the `open` system call without the blocked flags.
if (seccomp_rule_add(
ctx, SCMP_ACT_ALLOW, SCMP_SYS(open), 1,
SCMP_A1(SCMP_CMP_MASKED_EQ, O_WRONLY | O_RDWR | O_CREAT, 0)) != 0) {
seccomp_release(ctx);
return false;
}
// Add all general allow rules.
for (auto syscall_num : seccomp_syscalls_allowed) {
if (seccomp_rule_add(ctx, SCMP_ACT_ALLOW, syscall_num, 0) != 0) {
seccomp_release(ctx);
return false;
}
}
// Load the context for the current process.
auto ret = seccomp_load(ctx);
// Free the context and return success/failure.
seccomp_release(ctx);
return ret == 0;
}
bool SetLimit(int resource, rlim_t n) {
struct rlimit limit;
limit.rlim_cur = limit.rlim_max = n;
return setrlimit(resource, &limit) == 0;
}
///////////////////////////////////////////////////////
// Target function used to start the transform process.
///////////////////////////////////////////////////////
int Target(void *arg) {
// NOTE: (D)LOG shouldn't be used here because it wasn't initialized in this
// process and something really bad could happen.
// Get a pointer to the passed arguments.
TargetArguments *ta = reinterpret_cast<TargetArguments *>(arg);
// Redirect `stdin` to `/dev/null`.
int fd = open("/dev/null", O_RDONLY | O_CLOEXEC);
if (fd == -1) {
return EXIT_FAILURE;
}
if (dup2(fd, STDIN_FILENO) != STDIN_FILENO) {
return EXIT_FAILURE;
}
// Redirect `stdout` to `/dev/null`.
fd = open("/dev/null", O_WRONLY | O_CLOEXEC);
if (fd == -1) {
return EXIT_FAILURE;
}
if (dup2(fd, STDOUT_FILENO) != STDOUT_FILENO) {
return EXIT_FAILURE;
}
// Redirect `stderr` to `/dev/null`.
fd = open("/dev/null", O_WRONLY | O_CLOEXEC);
if (fd == -1) {
return EXIT_FAILURE;
}
if (dup2(fd, STDERR_FILENO) != STDERR_FILENO) {
return EXIT_FAILURE;
}
// Create the working directory.
fs::path working_path = GetTemporaryPath(getpid());
utils::DeleteDir(working_path);
if (!utils::EnsureDir(working_path)) {
return EXIT_FAILURE;
}
// Copy all scripts to the working directory.
if (!utils::CopyFile(GetHelperScriptPath(),
working_path / kHelperScriptName)) {
return EXIT_FAILURE;
}
if (!utils::CopyFile(ta->transform_script_path,
working_path / kTransformScriptName)) {
return EXIT_FAILURE;
}
// Change the current directory to the working directory.
if (chdir(working_path.c_str()) != 0) {
return EXIT_FAILURE;
}
// Create the executable CharPP object.
CharPP exe;
exe.Add(FLAGS_python_interpreter);
exe.Add(kHelperScriptName);
// Create the environment CharPP object.
CharPP env;
env.Add(fmt::format("PATH={}", GetEnvironmentVariable("PATH")));
// TODO (mferencevic): Change this to the effective user.
env.Add(fmt::format("USER={}", GetEnvironmentVariable("USER")));
env.Add(fmt::format("HOME={}", working_path));
env.Add("LANG=en_US.utf8");
env.Add("LANGUAGE=en_US:en");
env.Add("PYTHONUNBUFFERED=1");
env.Add("PYTHONIOENCODING=utf-8");
env.Add("PYTHONDONTWRITEBYTECODE=1");
// Connect the communication input pipe.
if (dup2(ta->pipe_to_python, kCommunicationToPythonFd) !=
kCommunicationToPythonFd) {
return EXIT_FAILURE;
}
// Connect the communication output pipe.
if (dup2(ta->pipe_from_python, kCommunicationFromPythonFd) !=
kCommunicationFromPythonFd) {
return EXIT_FAILURE;
}
// Set process limits.
// Disable core dumps.
if (!SetLimit(RLIMIT_CORE, 0)) {
return EXIT_FAILURE;
}
// Disable file creation.
if (!SetLimit(RLIMIT_FSIZE, 0)) {
return EXIT_FAILURE;
}
// Set process number limit.
if (!SetLimit(RLIMIT_NPROC, 0)) {
return EXIT_FAILURE;
}
// TODO (mferencevic): Change the user to `nobody`.
// Setup seccomp.
if (!SetupSeccomp()) {
return EXIT_FAILURE;
}
execve(*exe.Get(), exe.Get(), env.Get());
// TODO (mferencevic): Log an error with `errno` about what failed.
return EXIT_FAILURE;
}
/////////////////////////////////////////////////////////////
// Functions used to send data to the started Python process.
/////////////////////////////////////////////////////////////
/// The data that is being sent to the Python process is always a
/// `std::vector<uint8_t[]>` of data. It is sent in the following way:
///
/// uint32_t number of elements being sent
/// uint32_t element 0 size
/// uint8_t[] element 0 data
/// uint32_t element 1 size
/// uint8_t[] element 1 data
/// ...
///
/// The receiving end of the protocol is implemented in `kafka.py`.
void PutData(int fd, const uint8_t *data, uint32_t size) {
int ret = 0;
uint32_t put = 0;
while (put < size) {
ret = write(fd, data + put, size - put);
if (ret > 0) {
put += ret;
} else if (ret == 0) {
throw TransformExecutionException(
"The communication pipe to the transform process was closed!");
} else if (errno != EINTR) {
throw TransformExecutionException(
"Couldn't put data to the transfrom process!");
}
}
}
void PutSize(int fd, uint32_t size) {
PutData(fd, reinterpret_cast<uint8_t *>(&size), sizeof(size));
}
//////////////////////////////////////////////////////////////
// Functions used to get data from the started Python process.
//////////////////////////////////////////////////////////////
/// The data that is being sent from the Python process is always a
/// `std::vector<std::pair<std::string, Value>>>` of data (array of pairs of
/// query and params). It is sent in the following way:
///
/// uint32_t number of elements being sent
/// uint32_t element 0 query size
/// char[] element 0 query data
/// data[] element 0 params
/// uint32_t element 1 query size
/// char[] element 1 query data
/// data[] element 1 params
/// ...
///
/// When sending the query parameters they have to be further encoded to enable
/// sending of None, Bool, Int, Float, Str, List and Dict objects. The encoding
/// is as follows:
///
/// None: uint8_t type (kTypeNone)
/// Bool: uint8_t type (kTypeBoolFalse or kTypeBoolTrue)
/// Int: uint8_t type (kTypeInt), int64_t value
/// Float: uint8_t type (kTypeFloat), double value
/// Str: uint8_t type (kTypeStr), uint32_t size, char[] data
/// List: uint8_t type (kTypeList), uint32_t size, data[] element 0,
/// data[] element 1, ...
/// Dict: uint8_t type (kTypeDict), uint32_t size, uint32_t element 0 key size,
/// char[] element 0 key data, data[] element 0 value,
/// uint32_t element 1 key size, char[] element 1 key data,
/// data[] element 1 value, ...
///
/// The sending end of the protocol is implemented in `kafka.py`.
const uint8_t kTypeNone = 0x10;
const uint8_t kTypeBoolFalse = 0x20;
const uint8_t kTypeBoolTrue = 0x21;
const uint8_t kTypeInt = 0x30;
const uint8_t kTypeFloat = 0x40;
const uint8_t kTypeStr = 0x50;
const uint8_t kTypeList = 0x60;
const uint8_t kTypeDict = 0x70;
void GetData(int fd, uint8_t *data, uint32_t size) {
int ret = 0;
uint32_t got = 0;
while (got < size) {
ret = read(fd, data + got, size - got);
if (ret > 0) {
got += ret;
} else if (ret == 0) {
throw TransformExecutionException(
"The communication pipe from the transform process was closed!");
} else if (errno != EINTR) {
throw TransformExecutionException(
"Couldn't get data from the transform process!");
}
}
}
uint32_t GetSize(int fd) {
uint32_t size = 0;
GetData(fd, reinterpret_cast<uint8_t *>(&size), sizeof(size));
return size;
}
void GetString(int fd, std::string *value) {
const int kMaxStackBuffer = 8192;
uint8_t buffer[kMaxStackBuffer];
uint32_t size = GetSize(fd);
if (size < kMaxStackBuffer) {
GetData(fd, buffer, size);
*value = std::string(reinterpret_cast<char *>(buffer), size);
} else {
std::unique_ptr<uint8_t[]> tmp(new uint8_t[size]);
GetData(fd, tmp.get(), size);
*value = std::string(reinterpret_cast<char *>(tmp.get()), size);
}
}
void GetValue(int fd, Value *value) {
uint8_t type = 0;
GetData(fd, &type, sizeof(type));
if (type == kTypeNone) {
*value = Value();
} else if (type == kTypeBoolFalse) {
*value = Value(false);
} else if (type == kTypeBoolTrue) {
*value = Value(true);
} else if (type == kTypeInt) {
int64_t tmp = 0;
GetData(fd, reinterpret_cast<uint8_t *>(&tmp), sizeof(tmp));
*value = Value(tmp);
} else if (type == kTypeFloat) {
double tmp = 0.0;
GetData(fd, reinterpret_cast<uint8_t *>(&tmp), sizeof(tmp));
*value = Value(tmp);
} else if (type == kTypeStr) {
std::string tmp;
GetString(fd, &tmp);
*value = Value(tmp);
} else if (type == kTypeList) {
std::vector<Value> tmp_vec;
uint32_t size = GetSize(fd);
tmp_vec.reserve(size);
for (uint32_t i = 0; i < size; ++i) {
Value tmp_value;
GetValue(fd, &tmp_value);
tmp_vec.push_back(tmp_value);
}
*value = Value(tmp_vec);
} else if (type == kTypeDict) {
std::map<std::string, Value> tmp_map;
uint32_t size = GetSize(fd);
for (uint32_t i = 0; i < size; ++i) {
std::string tmp_key;
Value tmp_value;
GetString(fd, &tmp_key);
GetValue(fd, &tmp_value);
tmp_map.insert({tmp_key, tmp_value});
}
*value = Value(tmp_map);
} else {
throw TransformExecutionException(
fmt::format("Couldn't get value of unsupported type 0x{:02x}!", type));
}
}
} // namespace
namespace integrations::kafka {
Transform::Transform(const std::string &transform_script_path)
: transform_script_path_(transform_script_path) {}
bool Transform::Start() {
// Setup communication pipes.
if (pipe2(pipe_to_python_, O_CLOEXEC) != 0) {
DLOG(ERROR) << "Couldn't create communication pipe from cpp to python!";
return false;
}
if (pipe2(pipe_from_python_, O_CLOEXEC) != 0) {
DLOG(ERROR) << "Couldn't create communication pipe from python to cpp!";
return false;
}
// Find the top of the stack.
uint8_t *stack_top = stack_.get() + kStackSizeBytes;
// Set the target arguments.
target_arguments_->transform_script_path = transform_script_path_;
target_arguments_->pipe_to_python = pipe_to_python_[kPipeReadEnd];
target_arguments_->pipe_from_python = pipe_from_python_[kPipeWriteEnd];
// Create the process.
pid_ = clone(Target, stack_top, CLONE_VFORK, target_arguments_.get());
if (pid_ == -1) {
DLOG(ERROR) << "Couldn't create the communication process!";
return false;
}
// Close pipes that won't be used from the master process.
close(pipe_to_python_[kPipeReadEnd]);
close(pipe_from_python_[kPipeWriteEnd]);
return true;
}
void Transform::Apply(
const std::vector<std::unique_ptr<RdKafka::Message>> &batch,
std::function<void(const std::string &,
const std::map<std::string, Value> &)>
query_function) {
// Check that the process is alive.
if (waitpid(pid_, &status_, WNOHANG | WUNTRACED) != 0) {
throw TransformExecutionException("The transform process has died!");
}
// Put the `batch` data to the transform process.
PutSize(pipe_to_python_[kPipeWriteEnd], batch.size());
for (const auto &item : batch) {
PutSize(pipe_to_python_[kPipeWriteEnd], item->len());
PutData(pipe_to_python_[kPipeWriteEnd],
reinterpret_cast<const uint8_t *>(item->payload()), item->len());
}
// Get `query` and `params` data from the transfrom process.
uint32_t size = GetSize(pipe_from_python_[kPipeReadEnd]);
for (uint32_t i = 0; i < size; ++i) {
std::string query;
Value params;
GetString(pipe_from_python_[kPipeReadEnd], &query);
GetValue(pipe_from_python_[kPipeReadEnd], &params);
query_function(query, params.ValueMap());
}
}
Transform::~Transform() {
// Try to terminate the process gracefully in `kTerminateTimeoutSec`.
std::this_thread::sleep_for(std::chrono::milliseconds(100));
for (int i = 0; i < kTerminateTimeoutSec * 10; ++i) {
DLOG(INFO) << "Terminating the transform process with pid " << pid_;
kill(pid_, SIGTERM);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
int ret = waitpid(pid_, &status_, WNOHANG | WUNTRACED);
if (ret == pid_ || ret == -1) {
break;
}
}
// If the process is still alive, kill it and wait for it to die.
if (waitpid(pid_, &status_, WNOHANG | WUNTRACED) == 0) {
DLOG(WARNING) << "Killing the transform process with pid " << pid_;
kill(pid_, SIGKILL);
waitpid(pid_, &status_, 0);
}
// Delete the working directory.
if (pid_ != -1) {
utils::DeleteDir(GetTemporaryPath(pid_));
}
// Close leftover open pipes.
// We have to be careful to close only the leftover open pipes (the
// pipe_to_python WriteEnd and pipe_from_python ReadEnd), the other two ends
// were closed in the function that created them because they aren't used from
// the master process (they are only used from the Python process).
close(pipe_to_python_[kPipeWriteEnd]);
close(pipe_from_python_[kPipeReadEnd]);
}
} // namespace integrations::kafka

View File

@ -1,65 +0,0 @@
/// @file
#pragma once
#include <filesystem>
#include <map>
#include <string>
#include "rdkafkacpp.h"
#include "communication/bolt/v1/value.hpp"
namespace integrations::kafka {
struct TargetArguments {
std::filesystem::path transform_script_path;
int pipe_to_python{-1};
int pipe_from_python{-1};
};
/// Wrapper around the transform script for a stream.
class Transform final {
private:
const int kStackSizeBytes = 262144;
public:
/// Download the transform script from the given URI and store it on the given
/// path.
///
/// @param transform_script_uri URI of the script
/// @param transform_script_path path on the filesystem where the script
/// will be stored
///
/// @throws TransformScriptDownloadException if it can't download the script
explicit Transform(const std::string &transform_script_path);
/// Starts the transform script.
///
/// @return bool True on success or False otherwise.
bool Start();
/// Transform the given batch of messages using the transform script.
///
/// @param batch kafka message batch
/// @return std::vector<std::string> transformed batch of kafka messages
void Apply(const std::vector<std::unique_ptr<RdKafka::Message>> &batch,
std::function<void(
const std::string &,
const std::map<std::string, communication::bolt::Value> &)>
query_function);
~Transform();
private:
std::string transform_script_path_;
pid_t pid_{-1};
int status_{0};
// The stack used for the `clone` system call must be heap allocated.
std::unique_ptr<uint8_t[]> stack_{new uint8_t[kStackSizeBytes]};
// The target arguments passed to the new process must be heap allocated.
std::unique_ptr<TargetArguments> target_arguments_{new TargetArguments()};
int pipe_to_python_[2] = {-1, -1};
int pipe_from_python_[2] = {-1, -1};
};
} // namespace integrations::kafka

View File

@ -15,8 +15,6 @@
#else
#include "database/single_node/graph_db.hpp"
#endif
#include "integrations/kafka/exceptions.hpp"
#include "integrations/kafka/streams.hpp"
#include "memgraph_init.hpp"
#include "query/exceptions.hpp"
#include "telemetry/telemetry.hpp"
@ -100,23 +98,7 @@ void SingleNodeMain() {
query::InterpreterContext interpreter_context{&db};
SessionData session_data{&db, &interpreter_context, &auth, &audit_log};
integrations::kafka::Streams kafka_streams{
durability_directory / "streams",
[&session_data](
const std::string &query,
const std::map<std::string, communication::bolt::Value> &params) {
KafkaStreamWriter(session_data, query, params);
}};
interpreter_context.auth = &auth;
interpreter_context.kafka_streams = &kafka_streams;
try {
// Recover possible streams.
kafka_streams.Recover();
} catch (const integrations::kafka::KafkaStreamException &e) {
LOG(ERROR) << e.what();
}
ServerContext context;
std::string service_name = "Bolt";

View File

@ -165,28 +165,6 @@ void BoltSession::TypedValueResultStream::Result(
encoder_->MessageRecord(decoded_values);
}
void KafkaStreamWriter(
SessionData &session_data, const std::string &query,
const std::map<std::string, communication::bolt::Value> &params) {
query::Interpreter interpreter(session_data.interpreter_context);
KafkaResultStream stream;
std::map<std::string, PropertyValue> params_pv;
for (const auto &kv : params)
params_pv.emplace(kv.first, glue::ToPropertyValue(kv.second));
try {
// NOTE: This potentially allows Kafka streams to execute transaction
// control queries. However, those will not really work as a new
// `Interpreter` instance is created upon every call to this function,
// meaning any multicommand transaction state is lost.
interpreter.Interpret(query, params_pv);
interpreter.PullAll(&stream);
} catch (const utils::BasicException &e) {
LOG(WARNING) << "[Kafka] query execution failed with an exception: "
<< e.what();
}
};
// Needed to correctly handle memgraph destruction from a signal handler.
// Without having some sort of a flag, it is possible that a signal is handled
// when we are exiting main, inside destructors of database::GraphDb and

View File

@ -100,20 +100,6 @@ class BoltSession final
io::network::Endpoint endpoint_;
};
/// Class that implements ResultStream API for Kafka.
///
/// Kafka doesn't need to stream the import results back to the client so we
/// don't need any functionality here.
class KafkaResultStream {
public:
void Result(const std::vector<query::TypedValue> &) {}
};
/// Writes data streamed from kafka to memgraph.
void KafkaStreamWriter(
SessionData &session_data, const std::string &query,
const std::map<std::string, communication::bolt::Value> &params);
/// Set up signal handlers and register `shutdown` on SIGTERM and SIGINT.
/// In most cases you don't have to call this. If you are using a custom server
/// startup function for `WithInit`, then you probably need to use this to

View File

@ -2061,7 +2061,7 @@ cpp<#
show-users-for-role)
(:serialize))
(lcp:define-enum privilege
(create delete match merge set remove index stats auth stream constraint
(create delete match merge set remove index stats auth constraint
dump)
(:serialize))
#>cpp
@ -2096,64 +2096,10 @@ const std::vector<AuthQuery::Privilege> kPrivilegesAll = {
AuthQuery::Privilege::MATCH, AuthQuery::Privilege::MERGE,
AuthQuery::Privilege::SET, AuthQuery::Privilege::REMOVE,
AuthQuery::Privilege::INDEX, AuthQuery::Privilege::STATS,
AuthQuery::Privilege::AUTH, AuthQuery::Privilege::STREAM,
AuthQuery::Privilege::AUTH,
AuthQuery::Privilege::CONSTRAINT, AuthQuery::Privilege::DUMP};
cpp<#
(lcp:define-class stream-query (query)
((action "Action" :scope :public)
(stream-name "std::string" :scope :public)
(stream-uri "Expression *" :scope :public :initval "nullptr"
:slk-save #'slk-save-ast-pointer
:slk-load (slk-load-ast-pointer "Expression"))
(stream-topic "Expression *" :scope :public :initval "nullptr"
:slk-save #'slk-save-ast-pointer
:slk-load (slk-load-ast-pointer "Expression"))
(transform-uri "Expression *" :scope :public :initval "nullptr"
:slk-save #'slk-save-ast-pointer
:slk-load (slk-load-ast-pointer "Expression"))
(batch-interval-in-ms "Expression *" :scope :public :initval "nullptr"
:slk-save #'slk-save-ast-pointer
:slk-load (slk-load-ast-pointer "Expression"))
(batch-size "Expression *" :scope :public :initval "nullptr"
:slk-save #'slk-save-ast-pointer
:slk-load (slk-load-ast-pointer "Expression"))
(limit-batches "Expression *" :scope :public :initval "nullptr"
:slk-save #'slk-save-ast-pointer
:slk-load (slk-load-ast-pointer "Expression")))
(:public
(lcp:define-enum action
(create-stream drop-stream show-streams start-stream stop-stream
start-all-streams stop-all-streams test-stream)
(:serialize))
#>cpp
StreamQuery() = default;
DEFVISITABLE(QueryVisitor<void>);
cpp<#)
(:protected
#>cpp
StreamQuery(Action action, std::string stream_name, Expression *stream_uri,
Expression *stream_topic, Expression *transform_uri,
Expression *batch_interval_in_ms, Expression *batch_size,
Expression *limit_batches)
: action_(action),
stream_name_(std::move(stream_name)),
stream_uri_(stream_uri),
stream_topic_(stream_topic),
transform_uri_(transform_uri),
batch_interval_in_ms_(batch_interval_in_ms),
batch_size_(batch_size),
limit_batches_(limit_batches) {}
cpp<#)
(:private
#>cpp
friend class AstStorage;
cpp<#)
(:serialize (:slk))
(:clone))
(lcp:define-class info-query (query)
((info-type "InfoType" :scope :public))
(:public

View File

@ -65,7 +65,6 @@ class AuthQuery;
class ExplainQuery;
class ProfileQuery;
class IndexQuery;
class StreamQuery;
class InfoQuery;
class ConstraintQuery;
class RegexMatch;
@ -113,7 +112,7 @@ class ExpressionVisitor
template <class TResult>
class QueryVisitor
: public ::utils::Visitor<TResult, CypherQuery, ExplainQuery, ProfileQuery,
IndexQuery, AuthQuery, StreamQuery, InfoQuery,
IndexQuery, AuthQuery, InfoQuery,
ConstraintQuery, DumpQuery> {};
} // namespace query

View File

@ -178,15 +178,6 @@ antlrcpp::Any CypherMainVisitor::visitAuthQuery(
return auth_query;
}
antlrcpp::Any CypherMainVisitor::visitStreamQuery(
MemgraphCypher::StreamQueryContext *ctx) {
CHECK(ctx->children.size() == 1)
<< "StreamQuery should have exactly one child!";
auto *stream_query = ctx->children[0]->accept(this).as<StreamQuery *>();
query_ = stream_query;
return stream_query;
}
antlrcpp::Any CypherMainVisitor::visitDumpQuery(
MemgraphCypher::DumpQueryContext *ctx) {
auto *dump_query = storage_->Create<DumpQuery>();
@ -538,7 +529,6 @@ antlrcpp::Any CypherMainVisitor::visitPrivilege(
if (ctx->INDEX()) return AuthQuery::Privilege::INDEX;
if (ctx->STATS()) return AuthQuery::Privilege::STATS;
if (ctx->AUTH()) return AuthQuery::Privilege::AUTH;
if (ctx->STREAM()) return AuthQuery::Privilege::STREAM;
if (ctx->CONSTRAINT()) return AuthQuery::Privilege::CONSTRAINT;
LOG(FATAL) << "Should not get here - unknown privilege!";
}
@ -576,152 +566,6 @@ antlrcpp::Any CypherMainVisitor::visitShowUsersForRole(
return auth;
}
/**
* @return StreamQuery*
*/
antlrcpp::Any CypherMainVisitor::visitCreateStream(
MemgraphCypher::CreateStreamContext *ctx) {
auto *stream_query = storage_->Create<StreamQuery>();
stream_query->action_ = StreamQuery::Action::CREATE_STREAM;
stream_query->stream_name_ = ctx->streamName()->getText();
if (!ctx->streamUri->StringLiteral()) {
throw SyntaxException("Stream URI should be a string literal.");
}
stream_query->stream_uri_ = ctx->streamUri->accept(this);
if (!ctx->streamTopic->StringLiteral()) {
throw SyntaxException("Topic should be a string literal.");
}
stream_query->stream_topic_ = ctx->streamTopic->accept(this);
if (!ctx->transformUri->StringLiteral()) {
throw SyntaxException("Transform URI should be a string literal.");
}
stream_query->transform_uri_ = ctx->transformUri->accept(this);
if (ctx->batchIntervalOption()) {
stream_query->batch_interval_in_ms_ =
ctx->batchIntervalOption()->accept(this);
}
if (ctx->batchSizeOption()) {
stream_query->batch_size_ = ctx->batchSizeOption()->accept(this);
}
return stream_query;
}
/**
* @return Expression*
*/
antlrcpp::Any CypherMainVisitor::visitBatchIntervalOption(
MemgraphCypher::BatchIntervalOptionContext *ctx) {
if (!ctx->literal()->numberLiteral() ||
!ctx->literal()->numberLiteral()->integerLiteral()) {
throw SyntaxException("Batch interval should be an integer.");
}
return ctx->literal()->accept(this);
}
/**
* @return Expression*
*/
antlrcpp::Any CypherMainVisitor::visitBatchSizeOption(
MemgraphCypher::BatchSizeOptionContext *ctx) {
if (!ctx->literal()->numberLiteral() ||
!ctx->literal()->numberLiteral()->integerLiteral()) {
throw SyntaxException("Batch size should be an integer.");
}
return ctx->literal()->accept(this);
}
/**
* @return StreamQuery*
*/
antlrcpp::Any CypherMainVisitor::visitDropStream(
MemgraphCypher::DropStreamContext *ctx) {
auto *stream_query = storage_->Create<StreamQuery>();
stream_query->action_ = StreamQuery::Action::DROP_STREAM;
stream_query->stream_name_ = ctx->streamName()->getText();
return stream_query;
}
/**
* @return StreamQuery*
*/
antlrcpp::Any CypherMainVisitor::visitShowStreams(
MemgraphCypher::ShowStreamsContext *ctx) {
auto *stream_query = storage_->Create<StreamQuery>();
stream_query->action_ = StreamQuery::Action::SHOW_STREAMS;
return stream_query;
}
/**
* @return StreamQuery*
*/
antlrcpp::Any CypherMainVisitor::visitStartStream(
MemgraphCypher::StartStreamContext *ctx) {
auto *stream_query = storage_->Create<StreamQuery>();
stream_query->action_ = StreamQuery::Action::START_STREAM;
stream_query->stream_name_ = std::string(ctx->streamName()->getText());
if (ctx->limitBatchesOption()) {
stream_query->limit_batches_ = ctx->limitBatchesOption()->accept(this);
}
return stream_query;
}
/**
* @return StreamQuery*
*/
antlrcpp::Any CypherMainVisitor::visitStopStream(
MemgraphCypher::StopStreamContext *ctx) {
auto *stream_query = storage_->Create<StreamQuery>();
stream_query->action_ = StreamQuery::Action::STOP_STREAM;
stream_query->stream_name_ = std::string(ctx->streamName()->getText());
return stream_query;
}
/**
* @return Expression*
*/
antlrcpp::Any CypherMainVisitor::visitLimitBatchesOption(
MemgraphCypher::LimitBatchesOptionContext *ctx) {
if (!ctx->literal()->numberLiteral() ||
!ctx->literal()->numberLiteral()->integerLiteral()) {
throw SyntaxException("Batch limit should be an integer.");
}
return ctx->literal()->accept(this);
}
/*
* @return StreamQuery*
*/
antlrcpp::Any CypherMainVisitor::visitStartAllStreams(
MemgraphCypher::StartAllStreamsContext *ctx) {
auto *stream_query = storage_->Create<StreamQuery>();
stream_query->action_ = StreamQuery::Action::START_ALL_STREAMS;
return stream_query;
}
/*
* @return StreamQuery*
*/
antlrcpp::Any CypherMainVisitor::visitStopAllStreams(
MemgraphCypher::StopAllStreamsContext *ctx) {
auto *stream_query = storage_->Create<StreamQuery>();
stream_query->action_ = StreamQuery::Action::STOP_ALL_STREAMS;
return stream_query;
}
/**
* @return StreamQuery*
*/
antlrcpp::Any CypherMainVisitor::visitTestStream(
MemgraphCypher::TestStreamContext *ctx) {
auto *stream_query = storage_->Create<StreamQuery>();
stream_query->action_ = StreamQuery::Action::TEST_STREAM;
stream_query->stream_name_ = std::string(ctx->streamName()->getText());
if (ctx->limitBatchesOption()) {
stream_query->limit_batches_ = ctx->limitBatchesOption()->accept(this);
}
return stream_query;
}
antlrcpp::Any CypherMainVisitor::visitCypherReturn(
MemgraphCypher::CypherReturnContext *ctx) {
auto *return_clause = storage_->Create<Return>();

View File

@ -181,12 +181,6 @@ class CypherMainVisitor : public antlropencypher::MemgraphCypherBaseVisitor {
*/
antlrcpp::Any visitAuthQuery(MemgraphCypher::AuthQueryContext *ctx) override;
/**
* @return StreamQuery*
*/
antlrcpp::Any visitStreamQuery(
MemgraphCypher::StreamQueryContext *ctx) override;
/**
* @return DumpQuery*
*/
@ -326,63 +320,6 @@ class CypherMainVisitor : public antlropencypher::MemgraphCypherBaseVisitor {
antlrcpp::Any visitShowUsersForRole(
MemgraphCypher::ShowUsersForRoleContext *ctx) override;
/**
* @return StreamQuery*
*/
antlrcpp::Any visitCreateStream(
MemgraphCypher::CreateStreamContext *ctx) override;
antlrcpp::Any visitBatchIntervalOption(
MemgraphCypher::BatchIntervalOptionContext *ctx) override;
antlrcpp::Any visitBatchSizeOption(
MemgraphCypher::BatchSizeOptionContext *ctx) override;
/**
* @return StreamQuery*
*/
antlrcpp::Any visitDropStream(
MemgraphCypher::DropStreamContext *ctx) override;
/**
* @return StreamQuery*
*/
antlrcpp::Any visitShowStreams(
MemgraphCypher::ShowStreamsContext *ctx) override;
/**
* @return StreamQuery*
*/
antlrcpp::Any visitStartStream(
MemgraphCypher::StartStreamContext *ctx) override;
/**
* @return StreamQuery*
*/
antlrcpp::Any visitStopStream(
MemgraphCypher::StopStreamContext *ctx) override;
antlrcpp::Any visitLimitBatchesOption(
MemgraphCypher::LimitBatchesOptionContext *ctx) override;
/**
* @return StreamQuery*
*/
antlrcpp::Any visitStartAllStreams(
MemgraphCypher::StartAllStreamsContext *ctx) override;
/**
* @return StreamQuery*
*/
antlrcpp::Any visitStopAllStreams(
MemgraphCypher::StopAllStreamsContext *ctx) override;
/**
* @return StreamQuery*
*/
antlrcpp::Any visitTestStream(
MemgraphCypher::TestStreamContext *ctx) override;
/**
* @return Return*
*/

View File

@ -9,10 +9,7 @@ import Cypher ;
memgraphCypherKeyword : cypherKeyword
| ALTER
| AUTH
| BATCH
| BATCHES
| CLEAR
| DATA
| DATABASE
| DENY
| DROP
@ -21,24 +18,13 @@ memgraphCypherKeyword : cypherKeyword
| FROM
| GRANT
| IDENTIFIED
| INTERVAL
| K_TEST
| KAFKA
| LOAD
| PASSWORD
| PRIVILEGES
| REVOKE
| ROLE
| ROLES
| SIZE
| START
| STATS
| STOP
| STREAM
| STREAMS
| TO
| TOPIC
| TRANSFORM
| USER
| USERS
;
@ -55,7 +41,6 @@ query : cypherQuery
| infoQuery
| constraintQuery
| authQuery
| streamQuery
| dumpQuery
;
@ -104,7 +89,7 @@ denyPrivilege : DENY ( ALL PRIVILEGES | privileges=privilegeList ) TO userOrRole
revokePrivilege : REVOKE ( ALL PRIVILEGES | privileges=privilegeList ) FROM userOrRole=userOrRoleName ;
privilege : CREATE | DELETE | MATCH | MERGE | SET
| REMOVE | INDEX | STATS | AUTH | STREAM | CONSTRAINT | DUMP ;
| REMOVE | INDEX | STATS | AUTH | CONSTRAINT | DUMP ;
privilegeList : privilege ( ',' privilege )* ;
@ -114,40 +99,4 @@ showRoleForUser : SHOW ROLE FOR user=userOrRoleName ;
showUsersForRole : SHOW USERS FOR role=userOrRoleName ;
streamQuery : createStream
| dropStream
| showStreams
| startStream
| stopStream
| startAllStreams
| stopAllStreams
| testStream
;
streamName : symbolicName ;
createStream : CREATE STREAM streamName AS LOAD DATA KAFKA
streamUri=literal WITH TOPIC streamTopic=literal WITH TRANSFORM
transformUri=literal ( batchIntervalOption )? ( batchSizeOption )? ;
batchIntervalOption : BATCH INTERVAL literal ;
batchSizeOption : BATCH SIZE literal ;
dropStream : DROP STREAM streamName ;
showStreams : SHOW STREAMS ;
startStream : START STREAM streamName ( limitBatchesOption )? ;
stopStream : STOP STREAM streamName ;
limitBatchesOption : LIMIT limitBatches=literal BATCHES ;
startAllStreams : START ALL STREAMS ;
stopAllStreams : STOP ALL STREAMS ;
testStream : K_TEST STREAM streamName ( limitBatchesOption )? ;
dumpQuery: DUMP DATABASE ;

View File

@ -12,10 +12,7 @@ import CypherLexer ;
ALTER : A L T E R ;
AUTH : A U T H ;
BATCH : B A T C H ;
BATCHES : B A T C H E S ;
CLEAR : C L E A R ;
DATA : D A T A ;
DATABASE : D A T A B A S E ;
DENY : D E N Y ;
DROP : D R O P ;
@ -25,23 +22,12 @@ FROM : F R O M ;
GRANT : G R A N T ;
GRANTS : G R A N T S ;
IDENTIFIED : I D E N T I F I E D ;
INTERVAL : I N T E R V A L ;
K_TEST : T E S T ;
KAFKA : K A F K A ;
LOAD : L O A D ;
PASSWORD : P A S S W O R D ;
PRIVILEGES : P R I V I L E G E S ;
REVOKE : R E V O K E ;
ROLE : R O L E ;
ROLES : R O L E S ;
SIZE : S I Z E ;
START : S T A R T ;
STATS : S T A T S ;
STOP : S T O P ;
STREAM : S T R E A M ;
STREAMS : S T R E A M S ;
TO : T O ;
TOPIC : T O P I C ;
TRANSFORM : T R A N S F O R M ;
USER : U S E R ;
USERS : U S E R S ;

View File

@ -18,10 +18,6 @@ class PrivilegeExtractor : public QueryVisitor<void>,
void Visit(AuthQuery &) override { AddPrivilege(AuthQuery::Privilege::AUTH); }
void Visit(StreamQuery &) override {
AddPrivilege(AuthQuery::Privilege::STREAM);
}
void Visit(ExplainQuery &query) override {
query.cypher_query_->Accept(*this);
}

View File

@ -88,9 +88,8 @@ const trie::Trie kKeywords = {
"when", "then", "else", "end", "count", "filter",
"extract", "any", "none", "single", "true", "false",
"reduce", "coalesce", "user", "password", "alter", "drop",
"stream", "streams", "load", "data", "kafka", "transform",
"batch", "interval", "show", "start", "stats", "stop",
"size", "topic", "test", "unique", "explain", "profile",
"show", "stats",
"unique", "explain", "profile",
"storage", "index", "info", "exists", "assert", "constraint",
"node", "key", "dump", "database"};

View File

@ -10,8 +10,6 @@
#endif
#include "glue/auth.hpp"
#include "glue/communication.hpp"
#include "integrations/kafka/exceptions.hpp"
#include "integrations/kafka/streams.hpp"
#include "query/exceptions.hpp"
#include "query/frontend/ast/cypher_main_visitor.hpp"
#include "query/frontend/opencypher/parser.hpp"
@ -507,170 +505,6 @@ Callback HandleAuthQuery(AuthQuery *auth_query, auth::Auth *auth,
}
}
Callback HandleStreamQuery(StreamQuery *stream_query,
integrations::kafka::Streams *streams,
const Parameters &parameters,
DbAccessor *db_accessor) {
// Empty frame and symbol table for evaluation of expressions. This is OK
// since all expressions should be literals or parameter lookups.
Frame frame(0);
SymbolTable symbol_table;
EvaluationContext evaluation_context;
// TODO: MemoryResource for EvaluationContext, it should probably be passed as
// the argument to Callback.
evaluation_context.timestamp =
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
evaluation_context.parameters = parameters;
ExpressionEvaluator eval(&frame, symbol_table, evaluation_context,
db_accessor, storage::View::OLD);
std::string stream_name = stream_query->stream_name_;
auto stream_uri =
EvaluateOptionalExpression(stream_query->stream_uri_, &eval);
auto stream_topic =
EvaluateOptionalExpression(stream_query->stream_topic_, &eval);
auto transform_uri =
EvaluateOptionalExpression(stream_query->transform_uri_, &eval);
auto batch_interval_in_ms =
EvaluateOptionalExpression(stream_query->batch_interval_in_ms_, &eval);
auto batch_size =
EvaluateOptionalExpression(stream_query->batch_size_, &eval);
auto limit_batches =
EvaluateOptionalExpression(stream_query->limit_batches_, &eval);
Callback callback;
switch (stream_query->action_) {
case StreamQuery::Action::CREATE_STREAM:
callback.fn = [streams, stream_name, stream_uri, stream_topic,
transform_uri, batch_interval_in_ms, batch_size] {
CHECK(stream_uri.IsString());
CHECK(stream_topic.IsString());
CHECK(transform_uri.IsString());
CHECK(batch_interval_in_ms.IsInt() || batch_interval_in_ms.IsNull());
CHECK(batch_size.IsInt() || batch_size.IsNull());
integrations::kafka::StreamInfo info;
info.stream_name = stream_name;
info.stream_uri = stream_uri.ValueString();
info.stream_topic = stream_topic.ValueString();
info.transform_uri = transform_uri.ValueString();
info.batch_interval_in_ms =
batch_interval_in_ms.IsInt()
? std::make_optional(batch_interval_in_ms.ValueInt())
: std::nullopt;
info.batch_size = batch_size.IsInt()
? std::make_optional(batch_size.ValueInt())
: std::nullopt;
try {
streams->Create(info);
} catch (const integrations::kafka::KafkaStreamException &e) {
throw QueryRuntimeException(e.what());
}
return std::vector<std::vector<TypedValue>>();
};
return callback;
case StreamQuery::Action::DROP_STREAM:
callback.fn = [streams, stream_name] {
try {
streams->Drop(stream_name);
} catch (const integrations::kafka::KafkaStreamException &e) {
throw QueryRuntimeException(e.what());
}
return std::vector<std::vector<TypedValue>>();
};
return callback;
case StreamQuery::Action::SHOW_STREAMS:
callback.header = {"name", "uri", "topic", "transform", "status"};
callback.fn = [streams] {
std::vector<std::vector<TypedValue>> status;
for (const auto &stream : streams->Show()) {
status.push_back(std::vector<TypedValue>{
TypedValue(stream.stream_name), TypedValue(stream.stream_uri),
TypedValue(stream.stream_topic), TypedValue(stream.transform_uri),
TypedValue(stream.stream_status)});
}
return status;
};
return callback;
case StreamQuery::Action::START_STREAM:
callback.fn = [streams, stream_name, limit_batches] {
CHECK(limit_batches.IsInt() || limit_batches.IsNull());
try {
streams->Start(stream_name,
limit_batches.IsInt()
? std::make_optional(limit_batches.ValueInt())
: std::nullopt);
} catch (integrations::kafka::KafkaStreamException &e) {
throw QueryRuntimeException(e.what());
}
return std::vector<std::vector<TypedValue>>();
};
return callback;
case StreamQuery::Action::STOP_STREAM:
callback.fn = [streams, stream_name] {
try {
streams->Stop(stream_name);
} catch (integrations::kafka::KafkaStreamException &e) {
throw QueryRuntimeException(e.what());
}
return std::vector<std::vector<TypedValue>>();
};
return callback;
case StreamQuery::Action::START_ALL_STREAMS:
callback.fn = [streams] {
try {
streams->StartAll();
} catch (integrations::kafka::KafkaStreamException &e) {
throw QueryRuntimeException(e.what());
}
return std::vector<std::vector<TypedValue>>();
};
return callback;
case StreamQuery::Action::STOP_ALL_STREAMS:
callback.fn = [streams] {
try {
streams->StopAll();
} catch (integrations::kafka::KafkaStreamException &e) {
throw QueryRuntimeException(e.what());
}
return std::vector<std::vector<TypedValue>>();
};
return callback;
case StreamQuery::Action::TEST_STREAM:
callback.header = {"query", "params"};
callback.fn = [streams, stream_name, limit_batches] {
CHECK(limit_batches.IsInt() || limit_batches.IsNull());
std::vector<std::vector<TypedValue>> rows;
try {
auto results = streams->Test(
stream_name, limit_batches.IsInt()
? std::make_optional(limit_batches.ValueInt())
: std::nullopt);
for (const auto &result : results) {
std::map<std::string, TypedValue> params;
for (const auto &param : result.second) {
params.emplace(param.first, glue::ToTypedValue(param.second));
}
rows.emplace_back(std::vector<TypedValue>{TypedValue(result.first),
TypedValue(params)});
}
} catch (integrations::kafka::KafkaStreamException &e) {
throw QueryRuntimeException(e.what());
}
return rows;
};
return callback;
}
}
Callback HandleIndexQuery(IndexQuery *index_query,
std::function<void()> invalidate_plan_cache,
DbAccessor *db_accessor) {
@ -1274,19 +1108,6 @@ Interpreter::Results Interpreter::Prepare(
}
callback = HandleAuthQuery(auth_query, interpreter_context_->auth,
parsed_query.parameters, db_accessor);
#endif
} else if (auto *stream_query =
utils::Downcast<StreamQuery>(parsed_query.query)) {
#ifdef MG_SINGLE_NODE_HA
throw utils::NotYetImplemented(
"Graph streams are not yet supported in Memgraph HA instance.");
#else
if (in_explicit_transaction_) {
throw StreamClauseInMulticommandTxException();
}
callback =
HandleStreamQuery(stream_query, interpreter_context_->kafka_streams,
parsed_query.parameters, db_accessor);
#endif
} else if (auto *info_query =
utils::Downcast<InfoQuery>(parsed_query.query)) {

View File

@ -25,10 +25,6 @@ namespace auth {
class Auth;
} // namespace auth
namespace integrations::kafka {
class Streams;
} // namespace integrations::kafka
namespace query {
static constexpr size_t kExecutionMemoryBlockSize = 1U * 1024U * 1024U;
@ -135,7 +131,6 @@ struct InterpreterContext {
bool is_tsc_available{utils::CheckAvailableTSC()};
auth::Auth *auth{nullptr};
integrations::kafka::Streams *kafka_streams{nullptr};
utils::SkipList<QueryCacheEntry> ast_cache;
utils::SkipList<PlanCacheEntry> plan_cache;

View File

@ -1,5 +1,2 @@
# kafka test binaries
add_subdirectory(kafka)
# ha test binaries
add_subdirectory(ha)

View File

@ -1,14 +1,3 @@
- name: feature_benchmark__kafka
cd: kafka
commands: ./runner.sh
infiles:
- runner.sh # runner script
- transform.py # transform script
- generate.py # dataset generator script
- ../../../build_release/tests/feature_benchmark/kafka/kafka.py # kafka script
- ../../../build_release/tests/feature_benchmark/kafka/benchmark # benchmark binary
enable_network: true
- name: feature_benchmark__ha__read
cd: ha/read
commands: ./runner.sh

View File

@ -1,9 +0,0 @@
set(target_name memgraph__feature_benchmark__kafka)
set(benchmark_target_name ${target_name}__benchmark)
add_executable(${benchmark_target_name} benchmark.cpp)
set_target_properties(${benchmark_target_name} PROPERTIES OUTPUT_NAME benchmark)
target_link_libraries(${benchmark_target_name} mg-single-node kvstore_lib)
# Copy kafka.py to the feature integration kafka folder
configure_file(${PROJECT_SOURCE_DIR}/src/integrations/kafka/kafka.py ${CMAKE_CURRENT_BINARY_DIR} COPYONLY)

View File

@ -1,116 +0,0 @@
#include <algorithm>
#include <atomic>
#include <chrono>
#include <cstdint>
#include <exception>
#include <fstream>
#include <functional>
#include <limits>
#include <thread>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "json/json.hpp"
#include "database/single_node/graph_db.hpp"
#include "integrations/kafka/streams.hpp"
#include "memgraph_init.hpp"
#include "utils/flag_validation.hpp"
#include "utils/timer.hpp"
using namespace std::literals::chrono_literals;
DEFINE_int64(import_count, 0, "How many entries should we import.");
DEFINE_int64(timeout, 60, "How many seconds should the benchmark wait.");
DEFINE_string(kafka_uri, "", "Kafka URI.");
DEFINE_string(topic_name, "", "Kafka topic.");
DEFINE_string(transform_uri, "", "Transform script URI.");
DEFINE_string(output_file, "", "Output file where shold the results be.");
void KafkaBenchmarkMain() {
google::SetUsageMessage("Memgraph kafka benchmark database server");
auto durability_directory = std::filesystem::path(FLAGS_durability_directory);
auth::Auth auth{durability_directory / "auth"};
audit::Log audit_log{durability_directory / "audit",
audit::kBufferSizeDefault,
audit::kBufferFlushIntervalMillisDefault};
database::GraphDb db;
query::InterpreterContext interpreter_context{&db};
SessionData session_data{&db, &interpreter_context, &auth, &audit_log};
std::atomic<int64_t> query_counter{0};
std::atomic<bool> timeout_reached{false};
std::atomic<bool> benchmark_finished{false};
integrations::kafka::Streams kafka_streams{
std::filesystem::path(FLAGS_durability_directory) / "streams",
[&session_data, &query_counter](
const std::string &query,
const std::map<std::string, communication::bolt::Value> &params) {
KafkaStreamWriter(session_data, query, params);
query_counter++;
}};
interpreter_context.auth = &auth;
interpreter_context.kafka_streams = &kafka_streams;
std::string stream_name = "benchmark";
integrations::kafka::StreamInfo stream_info;
stream_info.stream_name = stream_name;
stream_info.stream_uri = FLAGS_kafka_uri;
stream_info.stream_topic = FLAGS_topic_name;
stream_info.transform_uri = FLAGS_transform_uri;
kafka_streams.Create(stream_info);
kafka_streams.StartAll();
// Kickoff a thread that will timeout after FLAGS_timeout seconds
std::thread timeout_thread_ =
std::thread([&timeout_reached, &benchmark_finished]() {
utils::ThreadSetName("BenchTimeout");
for (int64_t i = 0; i < FLAGS_timeout; ++i) {
std::this_thread::sleep_for(1s);
if (benchmark_finished.load()) return;
}
timeout_reached.store(true);
});
// Wait for the import to start
while (!timeout_reached.load() && query_counter.load() == 0) {
std::this_thread::sleep_for(1ms);
}
int64_t query_count_start = query_counter.load();
utils::Timer timer;
// Wait for the import to finish
while (!timeout_reached.load() && query_counter.load() < FLAGS_import_count) {
std::this_thread::sleep_for(1ms);
}
double duration = timer.Elapsed().count();
kafka_streams.StopAll();
kafka_streams.Drop(stream_name);
benchmark_finished.store(true);
if (timeout_thread_.joinable()) timeout_thread_.join();
int64_t writes = query_counter.load() - query_count_start;
double write_per_second = writes / duration;
std::ofstream output(FLAGS_output_file);
output << "duration " << duration << std::endl;
output << "executed_writes " << writes << std::endl;
output << "write_per_second " << write_per_second << std::endl;
output.close();
}
int main(int argc, char **argv) {
return WithInit(argc, argv, KafkaBenchmarkMain);
}

View File

@ -1,44 +0,0 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
'''
Kafka benchmark dataset generator.
'''
import random
import sys
from argparse import ArgumentParser
def get_edge():
from_node = random.randint(0, args.nodes)
to_node = random.randint(0, args.nodes)
while from_node == to_node:
to_node = random.randint(0, args.nodes)
return (from_node, to_node)
def parse_args():
argp = ArgumentParser(description=__doc__)
argp.add_argument("--nodes", type=int, default=100, help="Number of nodes.")
argp.add_argument("--edges", type=int, default=30, help="Number of edges.")
return argp.parse_args()
args = parse_args()
edges = set()
for i in range(args.nodes):
print("%d\n" % i)
for i in range(args.edges):
edge = get_edge()
while edge in edges:
edge = get_edge()
edges.add(edge)
print("%d %d\n" % edge)
sys.exit(0)

View File

@ -1,143 +0,0 @@
#!/bin/bash
## Helper functions
function wait_for_server {
port=$1
while ! nc -z -w 1 127.0.0.1 $port; do
sleep 0.1
done
sleep 1
}
function echo_info { printf "\033[1;36m~~ $1 ~~\033[0m\n"; }
function echo_success { printf "\033[1;32m~~ $1 ~~\033[0m\n\n"; }
function echo_failure { printf "\033[1;31m~~ $1 ~~\033[0m\n\n"; }
## Environment setup
# Get script location.
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
cd "$DIR"
# Create a temporary directory.
tmpdir=/tmp/memgraph_benchmark_kafka
if [ -d $tmpdir ]; then
rm -rf $tmpdir
fi
mkdir -p $tmpdir
cd $tmpdir
# Download the kafka binaries.
kafka="kafka_2.11-2.0.0"
wget -nv http://deps.memgraph.io/$kafka.tgz
tar -xf $kafka.tgz
mv $kafka kafka
# Find memgraph binaries.
binary_dir="$DIR/../../../build"
if [ ! -d $binary_dir ]; then
binary_dir="$DIR/../../../build_release"
fi
# Cleanup old kafka logs.
if [ -d /tmp/kafka-logs ]; then
rm -rf /tmp/kafka-logs
fi
if [ -d /tmp/zookeeper ]; then
rm -rf /tmp/zookeeper
fi
# Results for apollo
RESULTS="$DIR/.apollo_measurements"
# Benchmark parameters
NODES=100000
EDGES=10000
## Startup
# Start the zookeeper process and wait for it to start.
echo_info "Starting zookeeper"
./kafka/bin/zookeeper-server-start.sh -daemon kafka/config/zookeeper.properties
wait_for_server 2181
echo_success "Started zookeeper"
# Start the kafka process and wait for it to start.
echo_info "Starting kafka"
./kafka/bin/kafka-server-start.sh -daemon kafka/config/server.properties
wait_for_server 9092
echo_success "Started kafka"
# Create the kafka topic.
echo_info "Creating kafka topic test"
./kafka/bin/kafka-topics.sh --create \
--zookeeper 127.0.0.1:2181 \
--replication-factor 1 \
--partitions 1 \
--topic test
echo_success "Created kafka topic test"
# Start a http server to serve the transform script.
echo_info "Starting Python HTTP server"
mkdir serve
cd serve
cp "$DIR/transform.py" transform.py
python3 -m http.server &
http_server_pid=$!
wait_for_server 8000
cd ..
echo_success "Started Python HTTP server"
# Start the memgraph process and wait for it to start.
echo_info "Starting kafka benchmark"
$binary_dir/tests/feature_benchmark/kafka/benchmark \
--import-count=$(($NODES + $EDGES)) \
--timeout=60 \
--kafka-uri="127.0.0.1:9092" \
--topic-name="test" \
--transform-uri="127.0.0.1:8000/transform.py" \
--output-file=$RESULTS &
pid=$!
# Allow benchmark to initialize
sleep 5
echo_info "Generating kafka messages"
python3 "$DIR/generate.py" --nodes $NODES --edges $EDGES | \
./kafka/bin/kafka-console-producer.sh \
--broker-list 127.0.0.1:9092 \
--topic test \
> /dev/null
echo_success "Finished generating kafka messages"
wait -n $pid
code=$?
if [ $code -eq 0 ]; then
echo_success "Benchmark finished successfully"
else
echo_failure "Benchmark didn't finish successfully"
fi
## Cleanup
echo_info "Starting test cleanup"
# Shutdown the http server.
kill $http_server_pid
wait -n
# Shutdown the kafka process.
./kafka/bin/kafka-server-stop.sh
# Shutdown the zookeeper process.
./kafka/bin/zookeeper-server-stop.sh
echo_success "Test cleanup done"
[ $code -ne 0 ] && exit $code
exit 0

View File

@ -1,15 +0,0 @@
index_done = False
def stream(batch):
global index_done
ret = []
if not index_done:
ret.append(("CREATE INDEX ON :node(num)", {}))
index_done = True
for item in batch:
message = item.decode("utf-8").strip().split()
if len(message) == 1:
ret.append(("CREATE (:node {num: $num})", {"num": message[0]}))
elif len(message) == 2:
ret.append(("MATCH (n:node {num: $num1}), (m:node {num: $num2}) CREATE (n)-[:et]->(m)", {"num1": message[0], "num2": message[1]}))
return ret

View File

@ -7,9 +7,6 @@ add_subdirectory(ssl)
# transactions test binaries
add_subdirectory(transactions)
# kafka test binaries
add_subdirectory(kafka)
# auth test binaries
add_subdirectory(auth)

View File

@ -23,17 +23,6 @@
- ../../../build_debug/memgraph # memgraph binary
- ../../../build_debug/tests/integration/transactions/tester # tester binary
- name: integration__kafka
cd: kafka
commands: ./runner.sh
infiles:
- runner.sh # runner script
- transform.py # transform script
- ../../../build_debug/memgraph # memgraph binary
- ../../../build_debug/kafka.py # kafka script
- ../../../build_debug/tests/integration/kafka/tester # tester binary
enable_network: true
- name: integration__auth
cd: auth
commands: TIMEOUT=820 ./runner.py

View File

@ -141,41 +141,6 @@ QUERIES = [
"SHOW USERS FOR test_role",
("AUTH",)
),
# STREAM
(
"CREATE STREAM strim AS LOAD DATA KAFKA '127.0.0.1:9092' WITH TOPIC "
"'test' WITH TRANSFORM 'http://127.0.0.1/transform.py'",
("STREAM",)
),
(
"DROP STREAM strim",
("STREAM",)
),
(
"SHOW STREAMS",
("STREAM",)
),
(
"START STREAM strim",
("STREAM",)
),
(
"STOP STREAM strim",
("STREAM",)
),
(
"START ALL STREAMS",
("STREAM",)
),
(
"STOP ALL STREAMS",
("STREAM",)
),
(
"TEST STREAM strim",
("STREAM",)
),
]
UNAUTHORIZED_ERROR = "You are not authorized to execute this query! Please " \

View File

@ -1,6 +0,0 @@
set(target_name memgraph__integration__kafka)
set(tester_target_name ${target_name}__tester)
add_executable(${tester_target_name} tester.cpp)
set_target_properties(${tester_target_name} PROPERTIES OUTPUT_NAME tester)
target_link_libraries(${tester_target_name} mg-communication)

View File

@ -1,163 +0,0 @@
#!/bin/bash
## Helper functions
function wait_for_server {
port=$1
while ! nc -z -w 1 127.0.0.1 $port; do
sleep 0.1
done
sleep 1
}
function echo_info { printf "\033[1;36m~~ $1 ~~\033[0m\n"; }
function echo_success { printf "\033[1;32m~~ $1 ~~\033[0m\n\n"; }
function echo_failure { printf "\033[1;31m~~ $1 ~~\033[0m\n\n"; }
## Environment setup
# Get script location.
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
cd "$DIR"
# Create a temporary directory.
tmpdir=/tmp/memgraph_integration_kafka
if [ -d $tmpdir ]; then
rm -rf $tmpdir
fi
mkdir -p $tmpdir
cd $tmpdir
# Download the kafka binaries.
kafka="kafka_2.11-2.0.0"
wget -nv http://deps.memgraph.io/$kafka.tgz
tar -xf $kafka.tgz
mv $kafka kafka
# Find memgraph binaries.
binary_dir="$DIR/../../../build"
if [ ! -d $binary_dir ]; then
binary_dir="$DIR/../../../build_debug"
fi
# Cleanup old kafka logs.
if [ -d /tmp/kafka-logs ]; then
rm -rf /tmp/kafka-logs
fi
if [ -d /tmp/zookeeper ]; then
rm -rf /tmp/zookeeper
fi
## Startup
# Start the zookeeper process and wait for it to start.
echo_info "Starting zookeeper"
./kafka/bin/zookeeper-server-start.sh -daemon kafka/config/zookeeper.properties
wait_for_server 2181
echo_success "Started zookeeper"
# Start the kafka process and wait for it to start.
echo_info "Starting kafka"
./kafka/bin/kafka-server-start.sh -daemon kafka/config/server.properties
wait_for_server 9092
echo_success "Started kafka"
# Start the memgraph process and wait for it to start.
echo_info "Starting memgraph"
$binary_dir/memgraph &
pid=$!
wait_for_server 7687
echo_success "Started memgraph"
## Run the test
# Create the kafka topic.
echo_info "Creating kafka topic"
./kafka/bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test
echo_success "Created kafka topic"
# Start a http server to serve the transform script.
echo_info "Starting Python HTTP server"
mkdir serve
cd serve
cp "$DIR/transform.py" transform.py
python3 -m http.server &
wait_for_server 8000
http_pid=$!
cd ..
echo_success "Started Python HTTP server"
# Create and start the stream in memgraph.
echo_info "Defining and starting the stream in memgraph"
$binary_dir/tests/integration/kafka/tester --step start
code1=$?
if [ $code1 -eq 0 ]; then
echo_success "Defined and started the stream in memgraph"
else
echo_failure "Couldn't define and/or start the stream in memgraph"
fi
# Wait for the streams to start up.
sleep 10
# Produce some messages.
echo_info "Producing kafka messages"
./kafka/bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test <<EOF
1
2
3
4
1 2
3 4
1 4
EOF
echo_success "Produced kafka messages"
# Wait for the messages to be consumed.
sleep 10
# Verify that the received graph is good.
echo_info "Checking the received graph in memgraph"
$binary_dir/tests/integration/kafka/tester --step verify
code2=$?
if [ $code2 -eq 0 ]; then
echo_success "Checked the received graph in memgraph"
else
echo_failure "Couldn't check the received graph in memgraph"
fi
## Cleanup
echo_info "Starting test cleanup"
# Shutdown the http server.
kill $http_pid
wait -n
# Shutdown the memgraph process.
kill $pid
wait -n
code_mg=$?
# Shutdown the kafka process.
./kafka/bin/kafka-server-stop.sh
# Shutdown the zookeeper process.
./kafka/bin/zookeeper-server-stop.sh
echo_success "Test cleanup done"
# Check memgraph exit code.
if [ $code_mg -ne 0 ]; then
echo "The memgraph process didn't terminate properly!"
exit $code_mg
fi
# Exit with the exitcode of the test.
[ $code1 -ne 0 ] && exit $code1
[ $code2 -ne 0 ] && exit $code2
exit 0

View File

@ -1,81 +0,0 @@
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "communication/bolt/client.hpp"
#include "io/network/endpoint.hpp"
#include "io/network/utils.hpp"
#include "utils/timer.hpp"
DEFINE_string(address, "127.0.0.1", "Server address");
DEFINE_int32(port, 7687, "Server port");
DEFINE_string(username, "", "Username for the database");
DEFINE_string(password, "", "Password for the database");
DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server.");
DEFINE_string(step, "", "Step that should be executed on the database.");
void ExecuteQuery(communication::bolt::Client &client,
const std::string &query) {
try {
client.Execute(query, {});
} catch (const communication::bolt::ClientQueryException &e) {
LOG(FATAL) << "Couldn't execute query '" << query
<< "'! Received exception: " << e.what();
}
}
void ExecuteQueryAndCheck(communication::bolt::Client &client,
const std::string &query, int64_t value) {
try {
auto resp = client.Execute(query, {});
if (resp.records.size() == 0 || resp.records[0].size() == 0) {
LOG(FATAL) << "The query '" << query << "' didn't return records!";
}
if (resp.records[0][0].ValueInt() != value) {
LOG(FATAL) << "The query '" << query << "' was expected to return "
<< value << " but it returned "
<< resp.records[0][0].ValueInt() << "!";
}
} catch (const communication::bolt::ClientQueryException &e) {
LOG(FATAL) << "Couldn't execute query '" << query
<< "'! Received exception: " << e.what();
}
}
int main(int argc, char **argv) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
google::InitGoogleLogging(argv[0]);
communication::Init();
io::network::Endpoint endpoint(io::network::ResolveHostname(FLAGS_address),
FLAGS_port);
communication::ClientContext context(FLAGS_use_ssl);
communication::bolt::Client client(&context);
client.Connect(endpoint, FLAGS_username, FLAGS_password);
if (FLAGS_step == "start") {
ExecuteQuery(client,
"CREATE STREAM strim AS LOAD DATA KAFKA '127.0.0.1:9092' WITH "
"TOPIC 'test' WITH TRANSFORM "
"'http://127.0.0.1:8000/transform.py'");
ExecuteQuery(client, "START STREAM strim");
} else if (FLAGS_step == "verify") {
ExecuteQueryAndCheck(client,
"UNWIND RANGE(1, 4) AS x MATCH (n:node {num: "
"toString(x)}) RETURN count(n)",
4);
ExecuteQueryAndCheck(client,
"UNWIND [[1, 2], [3, 4], [1, 4]] AS x MATCH (n:node "
"{num: toString(x[0])})-[e:et]-(m:node {num: "
"toString(x[1])}) RETURN count(e)",
3);
} else {
LOG(FATAL) << "Unknown step " << FLAGS_step << "!";
}
return 0;
}

View File

@ -1,15 +0,0 @@
index_done = False
def stream(batch):
global index_done
ret = []
if not index_done:
ret.append(("CREATE INDEX ON :node(num)", {}))
index_done = True
for item in batch:
message = item.decode("utf-8").strip().split()
if len(message) == 1:
ret.append(("MERGE (:node {num: $num})", {"num": message[0]}))
elif len(message) == 2:
ret.append(("MATCH (n:node {num: $num1}), (m:node {num: $num2}) MERGE (n)-[:et]->(m)", {"num1": message[0], "num2": message[1]}))
return ret

View File

@ -2343,250 +2343,6 @@ TEST_P(CypherMainVisitorTest, ShowUsersForRole) {
SyntaxException);
}
TEST_P(CypherMainVisitorTest, CreateStream) {
auto check_create_stream =
[this](std::string input, const std::string &stream_name,
const std::string &stream_uri, const std::string &stream_topic,
const std::string &transform_uri,
std::optional<int64_t> batch_interval_in_ms,
std::optional<int64_t> batch_size) {
auto &ast_generator = *GetParam();
auto *stream_query =
dynamic_cast<StreamQuery *>(ast_generator.ParseQuery(input));
ASSERT_TRUE(stream_query);
EXPECT_EQ(stream_query->action_, StreamQuery::Action::CREATE_STREAM);
EXPECT_EQ(stream_query->stream_name_, stream_name);
ASSERT_TRUE(stream_query->stream_uri_);
ast_generator.CheckLiteral(stream_query->stream_uri_,
TypedValue(stream_uri));
ASSERT_TRUE(stream_query->stream_topic_);
ast_generator.CheckLiteral(stream_query->stream_topic_,
TypedValue(stream_topic));
ASSERT_TRUE(stream_query->transform_uri_);
ast_generator.CheckLiteral(stream_query->transform_uri_,
TypedValue(transform_uri));
if (batch_interval_in_ms) {
ASSERT_TRUE(stream_query->batch_interval_in_ms_);
ast_generator.CheckLiteral(stream_query->batch_interval_in_ms_,
TypedValue(*batch_interval_in_ms));
} else {
EXPECT_EQ(stream_query->batch_interval_in_ms_, nullptr);
}
if (batch_size) {
ASSERT_TRUE(stream_query->batch_size_);
ast_generator.CheckLiteral(stream_query->batch_size_,
TypedValue(*batch_size));
} else {
EXPECT_EQ(stream_query->batch_size_, nullptr);
}
};
check_create_stream(
"CREATE STREAM stream AS LOAD DATA KAFKA 'localhost' "
"WITH TOPIC 'tropika' "
"WITH TRANSFORM 'localhost/test.py'",
"stream", "localhost", "tropika", "localhost/test.py", std::nullopt,
std::nullopt);
check_create_stream(
"CreaTE StreaM stream AS LOad daTA KAFKA 'localhost' "
"WitH TopIC 'tropika' "
"WITH TRAnsFORM 'localhost/test.py' bAtCH inTErvAL 168",
"stream", "localhost", "tropika", "localhost/test.py", 168, std::nullopt);
check_create_stream(
"CreaTE StreaM stream AS LOad daTA KAFKA 'localhost' "
"WITH TopIC 'tropika' "
"WITH TRAnsFORM 'localhost/test.py' bAtCH SizE 17",
"stream", "localhost", "tropika", "localhost/test.py", std::nullopt, 17);
check_create_stream(
"CreaTE StreaM stream AS LOad daTA KAFKA 'localhost' "
"WitH TOPic 'tropika' "
"WITH TRAnsFORM 'localhost/test.py' bAtCH inTErvAL 168 Batch SIze 17",
"stream", "localhost", "tropika", "localhost/test.py", 168, 17);
EXPECT_THROW(check_create_stream(
"CREATE STREAM stream AS LOAD DATA KAFKA 'localhost' "
"WITH TRANSFORM 'localhost/test.py' BATCH INTERVAL 'jedan' ",
"stream", "localhost", "tropika", "localhost/test.py", 168,
std::nullopt),
SyntaxException);
EXPECT_THROW(check_create_stream(
"CREATE STREAM stream AS LOAD DATA KAFKA 'localhost' "
"WITH TOPIC 'tropika' "
"WITH TRANSFORM 'localhost/test.py' BATCH SIZE 'jedan' ",
"stream", "localhost", "tropika", "localhost/test.py",
std::nullopt, 17),
SyntaxException);
EXPECT_THROW(check_create_stream(
"CREATE STREAM 123 AS LOAD DATA KAFKA 'localhost' "
"WITH TOPIC 'tropika' "
"WITH TRANSFORM 'localhost/test.py' BATCH INTERVAL 168 ",
"stream", "localhost", "tropika", "localhost/test.py", 168,
std::nullopt),
SyntaxException);
EXPECT_THROW(
check_create_stream("CREATE STREAM stream AS LOAD DATA KAFKA localhost "
"WITH TOPIC 'tropika' "
"WITH TRANSFORM 'localhost/test.py'",
"stream", "localhost", "tropika", "localhost/test.py",
std::nullopt, std::nullopt),
SyntaxException);
EXPECT_THROW(check_create_stream(
"CREATE STREAM stream AS LOAD DATA KAFKA 'localhost' "
"WITH TOPIC 2"
"WITH TRANSFORM localhost/test.py BATCH INTERVAL 168 ",
"stream", "localhost", "tropika", "localhost/test.py", 168,
std::nullopt),
SyntaxException);
EXPECT_THROW(check_create_stream(
"CREATE STREAM stream AS LOAD DATA KAFKA 'localhost' "
"WITH TOPIC 'tropika'"
"WITH TRANSFORM localhost/test.py BATCH INTERVAL 168 ",
"stream", "localhost", "tropika", "localhost/test.py", 168,
std::nullopt),
SyntaxException);
}
TEST_P(CypherMainVisitorTest, DropStream) {
auto check_drop_stream = [this](std::string input,
const std::string &stream_name) {
auto &ast_generator = *GetParam();
auto *stream_query =
dynamic_cast<StreamQuery *>(ast_generator.ParseQuery(input));
ASSERT_TRUE(stream_query);
EXPECT_EQ(stream_query->action_, StreamQuery::Action::DROP_STREAM);
EXPECT_EQ(stream_query->stream_name_, stream_name);
};
check_drop_stream("DRop stREAm stream", "stream");
check_drop_stream("DRop stREAm strim", "strim");
EXPECT_THROW(check_drop_stream("DROp sTREAM", ""), SyntaxException);
EXPECT_THROW(check_drop_stream("DROP STreAM 123", "123"), SyntaxException);
EXPECT_THROW(check_drop_stream("DroP STREAM '123'", "123"), SyntaxException);
}
TEST_P(CypherMainVisitorTest, ShowStreams) {
auto check_show_streams = [this](std::string input) {
auto &ast_generator = *GetParam();
auto *stream_query =
dynamic_cast<StreamQuery *>(ast_generator.ParseQuery(input));
ASSERT_TRUE(stream_query);
EXPECT_EQ(stream_query->action_, StreamQuery::Action::SHOW_STREAMS);
};
check_show_streams("SHOW STREAMS");
EXPECT_THROW(check_show_streams("SHOW STREAMS lololo"), SyntaxException);
}
TEST_P(CypherMainVisitorTest, StartStopStream) {
auto check_start_stop_stream = [this](std::string input,
const std::string &stream_name,
bool is_start,
std::optional<int64_t> limit_batches) {
auto &ast_generator = *GetParam();
auto *stream_query =
dynamic_cast<StreamQuery *>(ast_generator.ParseQuery(input));
ASSERT_TRUE(stream_query);
EXPECT_EQ(stream_query->stream_name_, stream_name);
EXPECT_EQ(stream_query->action_, is_start
? StreamQuery::Action::START_STREAM
: StreamQuery::Action::STOP_STREAM);
if (limit_batches) {
ASSERT_TRUE(is_start);
ASSERT_TRUE(stream_query->limit_batches_);
ast_generator.CheckLiteral(stream_query->limit_batches_,
TypedValue(*limit_batches));
} else {
EXPECT_EQ(stream_query->limit_batches_, nullptr);
}
};
check_start_stop_stream("stARt STreaM STREAM", "STREAM", true, std::nullopt);
check_start_stop_stream("stARt STreaM strim", "strim", true, std::nullopt);
check_start_stop_stream("StARt STreAM strim LimIT 10 BATchES", "strim", true,
10);
check_start_stop_stream("StoP StrEAM strim", "strim", false, std::nullopt);
EXPECT_THROW(check_start_stop_stream("staRT STReaM 'strim'", "strim", true,
std::nullopt),
SyntaxException);
EXPECT_THROW(check_start_stop_stream("sTART STReaM strim LImiT 'dva' BATCheS",
"strim", true, 2),
SyntaxException);
EXPECT_THROW(check_start_stop_stream("StoP STreAM 'strim'", "strim", false,
std::nullopt),
SyntaxException);
EXPECT_THROW(check_start_stop_stream("STOp sTREAM strim LIMit 2 baTCHES",
"strim", false, 2),
SyntaxException);
}
TEST_P(CypherMainVisitorTest, StartStopAllStreams) {
auto check_start_stop_all_streams = [this](std::string input, bool is_start) {
auto &ast_generator = *GetParam();
auto *stream_query =
dynamic_cast<StreamQuery *>(ast_generator.ParseQuery(input));
ASSERT_TRUE(stream_query);
EXPECT_EQ(stream_query->action_,
is_start ? StreamQuery::Action::START_ALL_STREAMS
: StreamQuery::Action::STOP_ALL_STREAMS);
};
check_start_stop_all_streams("STarT AlL StreAMs", true);
check_start_stop_all_streams("StoP aLL STrEAMs", false);
EXPECT_THROW(check_start_stop_all_streams("StaRT aLL STreAM", true),
SyntaxException);
EXPECT_THROW(check_start_stop_all_streams("SToP AlL STREaM", false),
SyntaxException);
}
TEST_P(CypherMainVisitorTest, TestStream) {
auto check_test_stream = [this](std::string input,
const std::string &stream_name,
std::optional<int64_t> limit_batches) {
auto &ast_generator = *GetParam();
auto *stream_query =
dynamic_cast<StreamQuery *>(ast_generator.ParseQuery(input));
ASSERT_TRUE(stream_query);
EXPECT_EQ(stream_query->stream_name_, stream_name);
EXPECT_EQ(stream_query->action_, StreamQuery::Action::TEST_STREAM);
if (limit_batches) {
ASSERT_TRUE(stream_query->limit_batches_);
ast_generator.CheckLiteral(stream_query->limit_batches_,
TypedValue(*limit_batches));
} else {
EXPECT_EQ(stream_query->limit_batches_, nullptr);
}
};
check_test_stream("TesT STreaM strim", "strim", std::nullopt);
check_test_stream("TesT STreaM STREAM", "STREAM", std::nullopt);
check_test_stream("tESt STreAM STREAM LimIT 10 BATchES", "STREAM", 10);
check_test_stream("Test StrEAM STREAM", "STREAM", std::nullopt);
EXPECT_THROW(check_test_stream("tEST STReaM 'strim'", "strim", std::nullopt),
SyntaxException);
EXPECT_THROW(
check_test_stream("test STReaM strim LImiT 'dva' BATCheS", "strim", 2),
SyntaxException);
EXPECT_THROW(check_test_stream("test STreAM 'strim'", "strim", std::nullopt),
SyntaxException);
}
TEST_P(CypherMainVisitorTest, TestExplainRegularQuery) {
auto &ast_generator = *GetParam();
EXPECT_TRUE(dynamic_cast<ExplainQuery *>(
@ -2604,12 +2360,6 @@ TEST_P(CypherMainVisitorTest, TestExplainAuthQuery) {
EXPECT_THROW(ast_generator.ParseQuery("EXPLAIN SHOW ROLES"), SyntaxException);
}
TEST_P(CypherMainVisitorTest, TestExplainStreamQuery) {
auto &ast_generator = *GetParam();
EXPECT_THROW(ast_generator.ParseQuery("EXPLAIN SHOW STREAMS"),
SyntaxException);
}
TEST_P(CypherMainVisitorTest, TestProfileRegularQuery) {
{
auto &ast_generator = *GetParam();
@ -2639,12 +2389,6 @@ TEST_P(CypherMainVisitorTest, TestProfileAuthQuery) {
EXPECT_THROW(ast_generator.ParseQuery("PROFILE SHOW ROLES"), SyntaxException);
}
TEST_P(CypherMainVisitorTest, TestProfileStreamQuery) {
auto &ast_generator = *GetParam();
EXPECT_THROW(ast_generator.ParseQuery("PROFILE SHOW STREAMS"),
SyntaxException);
}
TEST_P(CypherMainVisitorTest, TestShowStorageInfo) {
auto &ast_generator = *GetParam();
auto *query =

View File

@ -652,37 +652,3 @@ auto GetMerge(AstStorage &storage, Pattern *pattern, OnMatch on_match,
storage.Create<query::AuthQuery>((action), (user), (role), (user_or_role), \
password, (privileges))
#define DROP_USER(usernames) storage.Create<query::DropUser>((usernames))
#define CREATE_STREAM(stream_name, stream_uri, stream_topic, transform_uri, \
batch_interval, batch_size) \
storage.Create<query::StreamQuery>( \
query::StreamQuery::Action::CREATE_STREAM, (stream_name), \
LITERAL(stream_uri), LITERAL(stream_topic), LITERAL(transform_uri), \
(batch_interval), (batch_size), nullptr)
#define DROP_STREAM(stream_name) \
storage.Create<query::StreamQuery>(query::StreamQuery::Action::DROP_STREAM, \
(stream_name), nullptr, nullptr, nullptr, \
nullptr, nullptr, nullptr)
#define SHOW_STREAMS \
storage.Create<query::StreamQuery>(query::StreamQuery::Action::SHOW_STREAMS, \
"", nullptr, nullptr, nullptr, nullptr, \
nullptr, nullptr)
#define START_STREAM(stream_name, limit_batches) \
storage.Create<query::StreamQuery>(query::StreamQuery::Action::START_STREAM, \
(stream_name), nullptr, nullptr, nullptr, \
nullptr, nullptr, (limit_batches))
#define STOP_STREAM(stream_name) \
storage.Create<query::StreamQuery>(query::StreamQuery::Action::STOP_STREAM, \
(stream_name), nullptr, nullptr, nullptr, \
nullptr, nullptr, nullptr)
#define START_ALL_STREAMS \
storage.Create<query::StreamQuery>( \
query::StreamQuery::Action::START_ALL_STREAMS, "", nullptr, nullptr, \
nullptr, nullptr, nullptr, nullptr)
#define STOP_ALL_STREAMS \
storage.Create<query::StreamQuery>( \
query::StreamQuery::Action::STOP_ALL_STREAMS, "", nullptr, nullptr, \
nullptr, nullptr, nullptr, nullptr)
#define TEST_STREAM(stream_name, limit_batches) \
storage.Create<query::TestStream>(query::StreamQuery::Action::TEST_STREAM, \
(stream_name), nullptr, nullptr, nullptr, \
nullptr, nullptr, (limit_batches))

View File

@ -111,28 +111,6 @@ TEST_F(TestPrivilegeExtractor, AuthQuery) {
UnorderedElementsAre(AuthQuery::Privilege::AUTH));
}
TEST_F(TestPrivilegeExtractor, StreamQuery) {
std::string stream_name("kafka");
std::string stream_uri("localhost:1234");
std::string stream_topic("tropik");
std::string transform_uri("localhost:1234/file.py");
std::vector<StreamQuery *> stream_queries = {
CREATE_STREAM(stream_name, stream_uri, stream_topic, transform_uri,
nullptr, nullptr),
DROP_STREAM(stream_name),
SHOW_STREAMS,
START_STREAM(stream_name, nullptr),
STOP_STREAM(stream_name),
START_ALL_STREAMS,
STOP_ALL_STREAMS};
for (auto *query : stream_queries) {
EXPECT_THAT(GetRequiredPrivileges(query),
UnorderedElementsAre(AuthQuery::Privilege::STREAM));
}
}
TEST_F(TestPrivilegeExtractor, ShowIndexInfo) {
auto *query = storage.Create<InfoQuery>();
query->info_type_ = InfoQuery::InfoType::INDEX;