From ce306a4c218c889924ba818cfe1d4ed941e648a6 Mon Sep 17 00:00:00 2001 From: Matej Ferencevic Date: Mon, 6 Aug 2018 13:05:42 +0200 Subject: [PATCH] Implement Kafka Python transform Summary: The Kafka Python transform functionality uses a Python script to transform incoming Kafka data into queries and parameters that are executed against the database. When starting the Python transform script it is started in a sandboxed environment so that it can't do harm to the host system or the database. Reviewers: msantl, teon.banek Reviewed By: msantl Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1509 --- init | 1 + src/integrations/CMakeLists.txt | 13 +- src/integrations/FindSeccomp.cmake | 90 ++++ src/integrations/kafka/consumer.cpp | 110 ++++- src/integrations/kafka/consumer.hpp | 28 +- src/integrations/kafka/exceptions.hpp | 11 +- src/integrations/kafka/kafka.py | 162 +++++++ src/integrations/kafka/streams.cpp | 27 +- src/integrations/kafka/streams.hpp | 25 +- src/integrations/kafka/transform.cpp | 656 +++++++++++++++++++++++++- src/integrations/kafka/transform.hpp | 42 +- src/memgraph_bolt.cpp | 54 ++- src/query/plan/operator.cpp | 34 +- src/query/plan/operator.lcp | 5 +- src/query/plan/rule_based_planner.hpp | 5 +- src/utils/file.cpp | 11 + src/utils/file.hpp | 11 + 17 files changed, 1176 insertions(+), 109 deletions(-) create mode 100644 src/integrations/FindSeccomp.cmake create mode 100644 src/integrations/kafka/kafka.py diff --git a/init b/init index ac6e6ccf2..81f57fcb0 100755 --- a/init +++ b/init @@ -6,6 +6,7 @@ required_pkgs=(git arcanist # source code control uuid-dev default-jre-headless # required by antlr libreadline-dev # for memgraph console libssl-dev + libseccomp-dev python3 python-virtualenv python3-pip # for qa, macro_benchmark and stress tests uuid-dev # mg-utils libcurl4-openssl-dev # mg-requests diff --git a/src/integrations/CMakeLists.txt b/src/integrations/CMakeLists.txt index 9a78112f1..24bd69658 100644 --- a/src/integrations/CMakeLists.txt +++ b/src/integrations/CMakeLists.txt @@ -3,7 +3,18 @@ set(integrations_src_files kafka/transform.cpp kafka/consumer.cpp) +include(FindSeccomp.cmake) +if (NOT SECCOMP_FOUND) + message(FATAL_ERROR "Couldn't find seccomp library!") +endif() + add_library(mg-integrations STATIC ${integrations_src_files}) target_link_libraries(mg-integrations stdc++fs Threads::Threads fmt glog gflags librdkafka++ librdkafka zlib json) -target_link_libraries(mg-integrations mg-utils mg-requests) +target_link_libraries(mg-integrations mg-utils mg-requests mg-communication) + +target_link_libraries(mg-integrations ${Seccomp_LIBRARIES}) +target_include_directories(mg-integrations SYSTEM PUBLIC ${Seccomp_INCLUDE_DIRS}) + +# Copy kafka.py to the root of our build directory where memgraph executable should be +configure_file(kafka/kafka.py ${CMAKE_BINARY_DIR} COPYONLY) diff --git a/src/integrations/FindSeccomp.cmake b/src/integrations/FindSeccomp.cmake new file mode 100644 index 000000000..eeb83c453 --- /dev/null +++ b/src/integrations/FindSeccomp.cmake @@ -0,0 +1,90 @@ +#.rst: +# FindSeccomp +# ----------- +# +# Try to locate the libseccomp library. +# If found, this will define the following variables: +# +# ``Seccomp_FOUND`` +# True if the seccomp library is available +# ``Seccomp_INCLUDE_DIRS`` +# The seccomp include directories +# ``Seccomp_LIBRARIES`` +# The seccomp libraries for linking +# +# If ``Seccomp_FOUND`` is TRUE, it will also define the following +# imported target: +# +# ``Seccomp::Seccomp`` +# The Seccomp library +# +# Since 5.44.0. + +#============================================================================= +# Copyright (c) 2017 Martin Flöser +# Copyright (c) 2017 David Kahles +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# 1. Redistributions of source code must retain the copyright +# notice, this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# 3. The name of the author may not be used to endorse or promote products +# derived from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR +# IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES +# OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. +# IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT +# NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF +# THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +#============================================================================= + +find_package(PkgConfig QUIET) +pkg_check_modules(PKG_Libseccomp QUIET libseccomp) + +find_path(Seccomp_INCLUDE_DIRS + NAMES + seccomp.h + HINTS + ${PKG_Libseccomp_INCLUDE_DIRS} +) +find_library(Seccomp_LIBRARIES + NAMES + seccomp + HINTS + ${PKG_Libseccomp_LIBRARY_DIRS} +) + +include(FindPackageHandleStandardArgs) +find_package_handle_standard_args(Seccomp + FOUND_VAR + Seccomp_FOUND + REQUIRED_VARS + Seccomp_LIBRARIES + Seccomp_INCLUDE_DIRS +) + +if (Seccomp_FOUND AND NOT TARGET Seccomp::Seccomp) + add_library(Seccomp::Seccomp UNKNOWN IMPORTED) + set_target_properties(Seccomp::Seccomp PROPERTIES + IMPORTED_LOCATION "${Seccomp_LIBRARIES}" + INTERFACE_INCLUDE_DIRECTORIES "${Seccomp_INCLUDE_DIRS}" + ) +endif() + +mark_as_advanced(Seccomp_LIBRARIES Seccomp_INCLUDE_DIRS) + +include(FeatureSummary) +set_package_properties(Seccomp PROPERTIES + URL "https://github.com/seccomp/libseccomp" + DESCRIPTION "The enhanced seccomp library." +) diff --git a/src/integrations/kafka/consumer.cpp b/src/integrations/kafka/consumer.cpp index 6947a1765..0d491703c 100644 --- a/src/integrations/kafka/consumer.cpp +++ b/src/integrations/kafka/consumer.cpp @@ -7,9 +7,9 @@ #include "integrations/kafka/exceptions.hpp" #include "utils/on_scope_exit.hpp" +#include "utils/thread.hpp" -namespace integrations { -namespace kafka { +namespace integrations::kafka { using namespace std::chrono_literals; @@ -30,10 +30,13 @@ void Consumer::event_cb(RdKafka::Event &event) { Consumer::Consumer( const StreamInfo &info, const std::string &transform_script_path, - std::function &)> stream_writer) + std::function< + void(const std::string &, + const std::map &)> + stream_writer) : info_(info), - stream_writer_(stream_writer), - transform_(transform_script_path) { + transform_script_path_(transform_script_path), + stream_writer_(stream_writer) { std::unique_ptr conf( RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL)); std::string error; @@ -112,13 +115,41 @@ void Consumer::StartConsuming( is_running_.store(true); thread_ = std::thread([this, limit_batches]() { + utils::ThreadSetName("StreamKafka"); + int64_t batch_count = 0; + Transform transform(transform_script_path_); + + transform_alive_.store(false); + if (!transform.Start()) { + LOG(WARNING) << "[Kafka] stream " << info_.stream_name + << " couldn't start the transform script!"; + return; + } + transform_alive_.store(true); while (is_running_) { // TODO (msantl): Figure out what to do with potential exceptions here. auto batch = this->GetBatch(); - auto transformed_batch = transform_.Apply(batch); - stream_writer_(transformed_batch); + + if (batch.empty()) continue; + + // All exceptions that could be possibly thrown by the `Apply` function + // must be handled here because they *will* crash the database if + // uncaught! + // TODO (mferencevic): Figure out what to do with all other exceptions. + try { + transform.Apply(batch, stream_writer_); + } catch (const TransformExecutionException) { + LOG(WARNING) << "[Kafka] stream " << info_.stream_name + << " the transform process has died!"; + break; + } catch (const utils::BasicException &e) { + LOG(WARNING) << "[Kafka] stream " << info_.stream_name + << " the transform process received an exception: " + << e.what(); + break; + } if (limit_batches != std::experimental::nullopt) { if (limit_batches <= ++batch_count) { @@ -127,6 +158,8 @@ void Consumer::StartConsuming( } } } + + transform_alive_.store(false); }); } @@ -153,7 +186,8 @@ std::vector> Consumer::GetBatch() { break; default: - LOG(ERROR) << "[Kafka] Consumer error: " << msg->errstr(); + LOG(WARNING) << "[Kafka] stream " << info_.stream_name + << " consumer error: " << msg->errstr(); run_batch = false; is_running_.store(false); break; @@ -217,8 +251,10 @@ void Consumer::StopIfRunning() { } } -std::vector Consumer::Test( - std::experimental::optional limit_batches) { +std::vector< + std::pair>> +Consumer::Test(std::experimental::optional limit_batches) { + // All exceptions thrown here are handled by the Bolt protocol. if (!consumer_) { throw ConsumerNotAvailableException(info_.stream_name); } @@ -227,29 +263,71 @@ std::vector Consumer::Test( throw ConsumerRunningException(info_.stream_name); } + Transform transform(transform_script_path_); + int64_t num_of_batches = limit_batches.value_or(kDefaultTestBatchLimit); - std::vector results; + std::vector< + std::pair>> + results; is_running_.store(true); utils::OnScopeExit cleanup([this]() { is_running_.store(false); }); + transform_alive_.store(false); + if (!transform.Start()) { + LOG(WARNING) << "[Kafka] stream " << info_.stream_name + << " couldn't start the transform script!"; + throw TransformExecutionException("Couldn't start the transform script!"); + } + transform_alive_.store(true); + for (int64_t i = 0; i < num_of_batches; ++i) { auto batch = GetBatch(); - auto transformed_batch = transform_.Apply(batch); - for (auto &record : transformed_batch) { - results.emplace_back(std::move(record)); + // Exceptions thrown by `Apply` are handled in Bolt. + // Wrap the `TransformExecutionException` into a new exception with a + // message that isn't so specific so the user doesn't get confused. + try { + transform.Apply( + batch, + [&results]( + const std::string &query, + const std::map ¶ms) { + results.push_back({query, params}); + }); + } catch (const TransformExecutionException) { + LOG(WARNING) << "[Kafka] stream " << info_.stream_name + << " the transform process has died!"; + throw TransformExecutionException( + "The transform script contains a runtime error!"); } } + transform_alive_.store(false); + return results; } +StreamStatus Consumer::Status() { + StreamStatus ret; + ret.stream_name = info_.stream_name; + ret.stream_uri = info_.stream_uri; + ret.stream_topic = info_.stream_topic; + ret.transform_uri = info_.transform_uri; + if (!is_running_) { + ret.stream_status = "stopped"; + } else if (!transform_alive_) { + ret.stream_status = "error"; + } else { + ret.stream_status = "running"; + } + return ret; +} + StreamInfo Consumer::info() { info_.is_running = is_running_; return info_; } -} // namespace kafka -} // namespace integrations +} // namespace integrations::kafka diff --git a/src/integrations/kafka/consumer.hpp b/src/integrations/kafka/consumer.hpp index a128fd3e8..2cfa9f236 100644 --- a/src/integrations/kafka/consumer.hpp +++ b/src/integrations/kafka/consumer.hpp @@ -10,6 +10,7 @@ #include "rdkafkacpp.h" +#include "communication/bolt/v1/value.hpp" #include "integrations/kafka/transform.hpp" namespace integrations { @@ -28,12 +29,23 @@ struct StreamInfo { bool is_running = false; }; +struct StreamStatus { + std::string stream_name; + std::string stream_uri; + std::string stream_topic; + std::string transform_uri; + std::string stream_status; +}; + class Consumer final : public RdKafka::EventCb { public: Consumer() = delete; Consumer(const StreamInfo &info, const std::string &transform_script_path, - std::function &)> stream_writer); + std::function< + void(const std::string &, + const std::map &)> + stream_writer); Consumer(const Consumer &other) = delete; Consumer(Consumer &&other) = delete; @@ -49,19 +61,23 @@ class Consumer final : public RdKafka::EventCb { void StopIfRunning(); - std::vector Test( - std::experimental::optional limit_batches); + std::vector< + std::pair>> + Test(std::experimental::optional limit_batches); + + StreamStatus Status(); StreamInfo info(); private: StreamInfo info_; std::string transform_script_path_; - std::function &)> stream_writer_; - - Transform transform_; + std::function &)> + stream_writer_; std::atomic is_running_{false}; + std::atomic transform_alive_{false}; std::thread thread_; std::unique_ptr -namespace integrations { -namespace kafka { +namespace integrations::kafka { + class KafkaStreamException : public utils::BasicException { using utils::BasicException::BasicException; }; @@ -117,5 +117,8 @@ class TransformScriptCouldNotBeDeletedException : public KafkaStreamException { "Couldn't delete transform script for stream {}", stream_name)) {} }; -} // namespace kafka -} // namespace integrations +class TransformExecutionException : public KafkaStreamException { + using KafkaStreamException::KafkaStreamException; +}; + +} // namespace integrations::kafka diff --git a/src/integrations/kafka/kafka.py b/src/integrations/kafka/kafka.py new file mode 100644 index 000000000..749ea8976 --- /dev/null +++ b/src/integrations/kafka/kafka.py @@ -0,0 +1,162 @@ +#!/usr/bin/python3 + +import os +import struct + +# Import the target transform script. + +import transform + + +# Constants used for communicating with the memgraph process. + +COMMUNICATION_TO_PYTHON_FD = 1000 +COMMUNICATION_FROM_PYTHON_FD = 1002 + + +# Functions used to get data from the memgraph process. + +def get_data(num_bytes): + data = bytes() + while len(data) < num_bytes: + data += os.read(COMMUNICATION_TO_PYTHON_FD, num_bytes - len(data)) + return data + + +def get_size(): + fmt = "I" # uint32_t + data = get_data(struct.calcsize(fmt)) + return struct.unpack(fmt, data)[0] + + +def get_batch(): + batch = [] + count = get_size() + for i in range(count): + size = get_size() + batch.append(get_data(size)) + return batch + + +# Functions used to put data to the memgraph process. + +TYPE_NONE = 0x10 +TYPE_BOOL_FALSE = 0x20 +TYPE_BOOL_TRUE = 0x21 +TYPE_INT = 0x30 +TYPE_FLOAT = 0x40 +TYPE_STR = 0x50 +TYPE_LIST = 0x60 +TYPE_DICT = 0x70 + + +def put_data(data): + written = 0 + while written < len(data): + written += os.write(COMMUNICATION_FROM_PYTHON_FD, data[written:]) + + +def put_size(size): + fmt = "I" # uint32_t + put_data(struct.pack(fmt, size)) + + +def put_type(typ): + fmt = "B" # uint8_t + put_data(struct.pack(fmt, typ)) + + +def put_string(value): + data = value.encode("utf-8") + put_size(len(data)) + put_data(data) + + +def put_value(value, ids): + if value is None: + put_type(TYPE_NONE) + elif type(value) is bool: + if value: + put_type(TYPE_BOOL_TRUE) + else: + put_type(TYPE_BOOL_FALSE) + elif type(value) is int: + put_type(TYPE_INT) + put_data(struct.pack("q", value)) # int64_t + elif type(value) is float: + put_type(TYPE_FLOAT) + put_data(struct.pack("d", value)) # double + elif type(value) is str: + put_type(TYPE_STR) + put_string(value) + elif type(value) is list: + if id(value) in ids: + raise ValueError("Recursive objects are not supported!") + ids_new = ids + [id(value)] + put_type(TYPE_LIST) + put_size(len(value)) + for item in value: + put_value(item, ids_new) + elif type(value) is dict: + if id(value) in ids: + raise ValueError("Recursive objects are not supported!") + ids_new = ids + [id(value)] + put_type(TYPE_DICT) + put_size(len(value)) + for key, item in value.items(): + if type(key) is not str: + raise TypeError("Dictionary keys must be strings!") + put_string(key) + put_value(item, ids_new) + else: + raise TypeError("Unsupported value type {}!".format(str(type(value)))) + + +# Functions used to continuously process data. + +def put_params(params): + if type(params) != dict: + raise TypeError("Parameters must be a dict!") + put_value(params, []) + + +class StreamError(Exception): + pass + + +def process_batch(): + # Get the data that should be transformed. + batch = get_batch() + + # Transform the data. + ret = transform.stream(batch) + + # Sanity checks for the transformed data. + if type(ret) != list: + raise StreamError("The transformed items must be a list!") + for item in ret: + if type(item) not in [list, tuple]: + raise StreamError("The transformed item must be a tuple " + "or a list!") + if len(item) != 2: + raise StreamError("There must be exactly two elements in the " + "transformed item!") + if type(item[0]) != str: + raise StreamError("The first transformed element of an item " + "must be a string!") + if type(item[1]) != dict: + raise StreamError("The second transformed element of an item " + "must be a dictionary!") + + # Send the items to the server. + put_size(len(ret)) + for query, params in ret: + put_string(query) + put_params(params) + + +# Main entry point. + +if __name__ == "__main__": + while True: + process_batch() diff --git a/src/integrations/kafka/streams.cpp b/src/integrations/kafka/streams.cpp index cc0165c06..cae07a68c 100644 --- a/src/integrations/kafka/streams.cpp +++ b/src/integrations/kafka/streams.cpp @@ -10,8 +10,7 @@ #include "requests/requests.hpp" #include "utils/file.hpp" -namespace integrations { -namespace kafka { +namespace integrations::kafka { namespace fs = std::experimental::filesystem; @@ -101,9 +100,11 @@ StreamInfo Deserialize(const nlohmann::json &data) { } // namespace -Streams::Streams( - const std::string &streams_directory, - std::function &)> stream_writer) +Streams::Streams(const std::string &streams_directory, + std::function &)> + stream_writer) : streams_directory_(streams_directory), stream_writer_(stream_writer), metadata_store_(fs::path(streams_directory) / kMetadataDir) {} @@ -240,19 +241,20 @@ void Streams::StopAll() { } } -std::vector Streams::Show() { - std::vector streams; +std::vector Streams::Show() { + std::vector streams; std::lock_guard g(mutex_); for (auto &consumer_kv : consumers_) { - streams.emplace_back(consumer_kv.second.info()); + streams.emplace_back(consumer_kv.second.Status()); } return streams; } -std::vector Streams::Test( - const std::string &stream_name, - std::experimental::optional limit_batches) { +std::vector< + std::pair>> +Streams::Test(const std::string &stream_name, + std::experimental::optional limit_batches) { std::lock_guard g(mutex_); auto find_it = consumers_.find(stream_name); if (find_it == consumers_.end()) @@ -269,5 +271,4 @@ std::string Streams::GetTransformScriptPath(const std::string &stream_name) { return fs::path(GetTransformScriptDir()) / (stream_name + kTransformExt); } -} // namespace kafka -} // namespace integrations +} // namespace integrations::kafka diff --git a/src/integrations/kafka/streams.hpp b/src/integrations/kafka/streams.hpp index 83e846203..1701a9295 100644 --- a/src/integrations/kafka/streams.hpp +++ b/src/integrations/kafka/streams.hpp @@ -8,13 +8,15 @@ #include "storage/kvstore.hpp" -namespace integrations { -namespace kafka { +namespace integrations::kafka { class Streams final { public: Streams(const std::string &streams_directory, - std::function &)> stream_writer); + std::function< + void(const std::string &, + const std::map &)> + stream_writer); void Recover(); @@ -32,15 +34,19 @@ class Streams final { void StopAll(); - std::vector Show(); + std::vector Show(); - std::vector Test(const std::string &stream_name, - std::experimental::optional - batch_limit = std::experimental::nullopt); + std::vector< + std::pair>> + Test(const std::string &stream_name, + std::experimental::optional batch_limit = + std::experimental::nullopt); private: std::string streams_directory_; - std::function &)> stream_writer_; + std::function &)> + stream_writer_; storage::KVStore metadata_store_; @@ -51,5 +57,4 @@ class Streams final { std::string GetTransformScriptPath(const std::string &stream_name); }; -} // namespace kafka -} // namespace integrations +} // namespace integrations::kafka diff --git a/src/integrations/kafka/transform.cpp b/src/integrations/kafka/transform.cpp index a7d86fec8..de6d617e9 100644 --- a/src/integrations/kafka/transform.cpp +++ b/src/integrations/kafka/transform.cpp @@ -1,22 +1,654 @@ #include "integrations/kafka/transform.hpp" -namespace integrations { -namespace kafka { +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "communication/bolt/v1/value.hpp" +#include "integrations/kafka/exceptions.hpp" +#include "utils/file.hpp" + +DEFINE_string(python_interpreter, "/usr/bin/python3", + "Path to the Python 3.x interpreter that should be used"); + +namespace { + +/////////////////////// +// Namespace shortcuts. +/////////////////////// + +using communication::bolt::Value; +using integrations::kafka::TargetArguments; +using integrations::kafka::TransformExecutionException; +namespace fs = std::experimental::filesystem; + +///////////////////////////////////////////////////////////////////////// +// Constants used for starting and communicating with the target process. +///////////////////////////////////////////////////////////////////////// + +const int kPipeReadEnd = 0; +const int kPipeWriteEnd = 1; + +const int kCommunicationToPythonFd = 1000; +const int kCommunicationFromPythonFd = 1002; + +const int kTerminateTimeoutSec = 5; + +const std::string kHelperScriptName = "kafka.py"; +const std::string kTransformScriptName = "transform.py"; + +//////////////////// +// Helper functions. +//////////////////// + +fs::path GetTemporaryPath(pid_t pid) { + return fs::temp_directory_path() / "memgraph" / + fmt::format("transform_{}", pid); +} + +fs::path GetHelperScriptPath() { + char path[PATH_MAX]; + memset(path, 0, PATH_MAX); + auto ret = readlink("/proc/self/exe", path, PATH_MAX); + if (ret < 0) return ""; + return fs::path() / std::string(dirname(path)) / kHelperScriptName; +} + +std::string GetEnvironmentVariable(const std::string &name) { + char *value = secure_getenv(name.c_str()); + if (value == nullptr) return ""; + return {value}; +} + +/////////////////////////////////////////// +// char** wrapper used for C library calls. +/////////////////////////////////////////// + +const int kCharppMaxElements = 20; + +class CharPP final { + public: + CharPP() { memset(data_, 0, sizeof(char *) * kCharppMaxElements); } + + ~CharPP() { + for (size_t i = 0; i < size_; ++i) { + free(data_[i]); + } + } + + CharPP(const CharPP &) = delete; + CharPP(CharPP &&) = delete; + CharPP &operator=(const CharPP &) = delete; + CharPP &operator=(CharPP &&) = delete; + + void Add(const char *value) { + if (size_ == kCharppMaxElements) return; + int len = strlen(value); + char *item = (char *)malloc(sizeof(char) * (len + 1)); + if (item == nullptr) return; + memcpy(item, value, len); + item[len] = 0; + data_[size_++] = item; + } + + void Add(const std::string &value) { Add(value.c_str()); } + + char **Get() { return data_; } + + private: + char *data_[kCharppMaxElements]; + size_t size_{0}; +}; + +//////////////////////////////////// +// Security functions and constants. +//////////////////////////////////// + +const std::vector seccomp_syscalls_allowed = { + SCMP_SYS(read), + SCMP_SYS(write), + SCMP_SYS(close), + SCMP_SYS(stat), + SCMP_SYS(fstat), + SCMP_SYS(lstat), + SCMP_SYS(poll), + SCMP_SYS(lseek), + SCMP_SYS(mmap), + SCMP_SYS(mprotect), + SCMP_SYS(munmap), + SCMP_SYS(brk), + SCMP_SYS(rt_sigaction), + SCMP_SYS(rt_sigprocmask), + SCMP_SYS(rt_sigreturn), + SCMP_SYS(ioctl), + SCMP_SYS(pread64), + SCMP_SYS(pwrite64), + SCMP_SYS(readv), + SCMP_SYS(writev), + SCMP_SYS(access), + SCMP_SYS(select), + SCMP_SYS(mremap), + SCMP_SYS(msync), + SCMP_SYS(mincore), + SCMP_SYS(madvise), + SCMP_SYS(dup), + SCMP_SYS(dup2), + SCMP_SYS(pause), + SCMP_SYS(nanosleep), + SCMP_SYS(getpid), + SCMP_SYS(sendfile), + SCMP_SYS(execve), + SCMP_SYS(exit), + SCMP_SYS(uname), + SCMP_SYS(fcntl), + SCMP_SYS(fsync), + SCMP_SYS(fdatasync), + SCMP_SYS(getdents), + SCMP_SYS(getcwd), + SCMP_SYS(readlink), + SCMP_SYS(gettimeofday), + SCMP_SYS(getrlimit), + SCMP_SYS(getrusage), + SCMP_SYS(getuid), + SCMP_SYS(getgid), + SCMP_SYS(geteuid), + SCMP_SYS(getegid), + SCMP_SYS(getppid), + SCMP_SYS(getpgrp), + SCMP_SYS(rt_sigpending), + SCMP_SYS(rt_sigtimedwait), + SCMP_SYS(rt_sigsuspend), + SCMP_SYS(sched_setparam), + SCMP_SYS(mlock), + SCMP_SYS(munlock), + SCMP_SYS(mlockall), + SCMP_SYS(munlockall), + SCMP_SYS(arch_prctl), + SCMP_SYS(ioperm), + SCMP_SYS(time), + SCMP_SYS(futex), + SCMP_SYS(set_tid_address), + SCMP_SYS(clock_gettime), + SCMP_SYS(clock_getres), + SCMP_SYS(clock_nanosleep), + SCMP_SYS(exit_group), + SCMP_SYS(mbind), + SCMP_SYS(set_mempolicy), + SCMP_SYS(get_mempolicy), + SCMP_SYS(migrate_pages), + SCMP_SYS(openat), + SCMP_SYS(pselect6), + SCMP_SYS(ppoll), + SCMP_SYS(set_robust_list), + SCMP_SYS(get_robust_list), + SCMP_SYS(tee), + SCMP_SYS(move_pages), + SCMP_SYS(dup3), + SCMP_SYS(preadv), + SCMP_SYS(pwritev), + SCMP_SYS(getrandom), + SCMP_SYS(sigaltstack), + SCMP_SYS(gettid), + SCMP_SYS(tgkill), + SCMP_SYS(sysinfo), +}; + +bool SetupSeccomp() { + // Initialize the seccomp context. + scmp_filter_ctx ctx; + ctx = seccomp_init(SCMP_ACT_TRAP); + if (ctx == NULL) return false; + + // First we deny access to the `open` system call called with `O_WRONLY`, + // `O_RDWR` and `O_CREAT`. + if (seccomp_rule_add(ctx, SCMP_ACT_KILL, SCMP_SYS(open), 1, + SCMP_A1(SCMP_CMP_MASKED_EQ, O_WRONLY, O_WRONLY)) != 0) { + seccomp_release(ctx); + return false; + } + if (seccomp_rule_add(ctx, SCMP_ACT_KILL, SCMP_SYS(open), 1, + SCMP_A1(SCMP_CMP_MASKED_EQ, O_RDWR, O_RDWR)) != 0) { + seccomp_release(ctx); + return false; + } + if (seccomp_rule_add(ctx, SCMP_ACT_KILL, SCMP_SYS(open), 1, + SCMP_A1(SCMP_CMP_MASKED_EQ, O_CREAT, O_CREAT)) != 0) { + seccomp_release(ctx); + return false; + } + // Now we allow the `open` system call without the blocked flags. + if (seccomp_rule_add( + ctx, SCMP_ACT_ALLOW, SCMP_SYS(open), 1, + SCMP_A1(SCMP_CMP_MASKED_EQ, O_WRONLY | O_RDWR | O_CREAT, 0)) != 0) { + seccomp_release(ctx); + return false; + } + + // Add all general allow rules. + for (auto syscall_num : seccomp_syscalls_allowed) { + if (seccomp_rule_add(ctx, SCMP_ACT_ALLOW, syscall_num, 0) != 0) { + seccomp_release(ctx); + return false; + } + } + + // Load the context for the current process. + auto ret = seccomp_load(ctx); + + // Free the context and return success/failure. + seccomp_release(ctx); + return ret == 0; +} + +bool SetLimit(int resource, rlim_t n) { + struct rlimit limit; + limit.rlim_cur = limit.rlim_max = n; + return setrlimit(resource, &limit) == 0; +} + +/////////////////////////////////////////////////////// +// Target function used to start the transform process. +/////////////////////////////////////////////////////// + +int Target(void *arg) { + // NOTE: (D)LOG shouldn't be used here because it wasn't initialized in this + // process and something really bad could happen. + + // Get a pointer to the passed arguments. + TargetArguments *ta = reinterpret_cast(arg); + + // Redirect `stdin` to `/dev/null`. + int fd = open("/dev/null", O_RDONLY | O_CLOEXEC); + if (fd == -1) { + return EXIT_FAILURE; + } + if (dup2(fd, STDIN_FILENO) != STDIN_FILENO) { + return EXIT_FAILURE; + } + + // Redirect `stdout` to `/dev/null`. + fd = open("/dev/null", O_WRONLY | O_CLOEXEC); + if (fd == -1) { + return EXIT_FAILURE; + } + if (dup2(fd, STDOUT_FILENO) != STDOUT_FILENO) { + return EXIT_FAILURE; + } + + // Redirect `stderr` to `/dev/null`. + fd = open("/dev/null", O_WRONLY | O_CLOEXEC); + if (fd == -1) { + return EXIT_FAILURE; + } + if (dup2(fd, STDERR_FILENO) != STDERR_FILENO) { + return EXIT_FAILURE; + } + + // Create the working directory. + fs::path working_path = GetTemporaryPath(getpid()); + utils::DeleteDir(working_path); + if (!utils::EnsureDir(working_path)) { + return EXIT_FAILURE; + } + + // Copy all scripts to the working directory. + if (!utils::CopyFile(GetHelperScriptPath(), + working_path / kHelperScriptName)) { + return EXIT_FAILURE; + } + if (!utils::CopyFile(ta->transform_script_path, + working_path / kTransformScriptName)) { + return EXIT_FAILURE; + } + + // Change the current directory to the working directory. + if (chdir(working_path.c_str()) != 0) { + return EXIT_FAILURE; + } + + // Create the executable CharPP object. + CharPP exe; + exe.Add(FLAGS_python_interpreter); + exe.Add(kHelperScriptName); + + // Create the environment CharPP object. + CharPP env; + env.Add(fmt::format("PATH={}", GetEnvironmentVariable("PATH"))); + // TODO (mferencevic): Change this to the effective user. + env.Add(fmt::format("USER={}", GetEnvironmentVariable("USER"))); + env.Add(fmt::format("HOME={}", working_path)); + env.Add("LANG=en_US.utf8"); + env.Add("LANGUAGE=en_US:en"); + env.Add("PYTHONUNBUFFERED=1"); + env.Add("PYTHONIOENCODING=utf-8"); + env.Add("PYTHONDONTWRITEBYTECODE=1"); + + // Connect the communication input pipe. + if (dup2(ta->pipe_to_python, kCommunicationToPythonFd) != + kCommunicationToPythonFd) { + return EXIT_FAILURE; + } + + // Connect the communication output pipe. + if (dup2(ta->pipe_from_python, kCommunicationFromPythonFd) != + kCommunicationFromPythonFd) { + return EXIT_FAILURE; + } + + // Set process limits. + // Disable core dumps. + if (!SetLimit(RLIMIT_CORE, 0)) { + return EXIT_FAILURE; + } + // Disable file creation. + if (!SetLimit(RLIMIT_FSIZE, 0)) { + return EXIT_FAILURE; + } + // Set process number limit. + if (!SetLimit(RLIMIT_NPROC, 0)) { + return EXIT_FAILURE; + } + + // TODO (mferencevic): Change the user to `nobody`. + + // Setup seccomp. + if (!SetupSeccomp()) { + return EXIT_FAILURE; + } + + execve(*exe.Get(), exe.Get(), env.Get()); + + // TODO (mferencevic): Log an error with `errno` about what failed. + + return EXIT_FAILURE; +} + +///////////////////////////////////////////////////////////// +// Functions used to send data to the started Python process. +///////////////////////////////////////////////////////////// + +/// The data that is being sent to the Python process is always a +/// `std::vector` of data. It is sent in the following way: +/// +/// uint32_t number of elements being sent +/// uint32_t element 0 size +/// uint8_t[] element 0 data +/// uint32_t element 1 size +/// uint8_t[] element 1 data +/// ... +/// +/// The receiving end of the protocol is implemented in `kafka.py`. + +void PutData(int fd, const uint8_t *data, uint32_t size) { + int ret = 0; + uint32_t put = 0; + while (put < size) { + ret = write(fd, data + put, size - put); + if (ret > 0) { + put += ret; + } else if (ret == 0) { + throw TransformExecutionException( + "The communication pipe to the transform process was closed!"); + } else if (errno != EINTR) { + throw TransformExecutionException( + "Couldn't put data to the transfrom process!"); + } + } +} + +void PutSize(int fd, uint32_t size) { + PutData(fd, reinterpret_cast(&size), sizeof(size)); +} + +////////////////////////////////////////////////////////////// +// Functions used to get data from the started Python process. +////////////////////////////////////////////////////////////// + +/// The data that is being sent from the Python process is always a +/// `std::vector>>` of data (array of pairs of +/// query and params). It is sent in the following way: +/// +/// uint32_t number of elements being sent +/// uint32_t element 0 query size +/// char[] element 0 query data +/// data[] element 0 params +/// uint32_t element 1 query size +/// char[] element 1 query data +/// data[] element 1 params +/// ... +/// +/// When sending the query parameters they have to be further encoded to enable +/// sending of None, Bool, Int, Float, Str, List and Dict objects. The encoding +/// is as follows: +/// +/// None: uint8_t type (kTypeNone) +/// Bool: uint8_t type (kTypeBoolFalse or kTypeBoolTrue) +/// Int: uint8_t type (kTypeInt), int64_t value +/// Float: uint8_t type (kTypeFloat), double value +/// Str: uint8_t type (kTypeStr), uint32_t size, char[] data +/// List: uint8_t type (kTypeList), uint32_t size, data[] element 0, +/// data[] element 1, ... +/// Dict: uint8_t type (kTypeDict), uint32_t size, uint32_t element 0 key size, +/// char[] element 0 key data, data[] element 0 value, +/// uint32_t element 1 key size, char[] element 1 key data, +/// data[] element 1 value, ... +/// +/// The sending end of the protocol is implemented in `kafka.py`. + +const uint8_t kTypeNone = 0x10; +const uint8_t kTypeBoolFalse = 0x20; +const uint8_t kTypeBoolTrue = 0x21; +const uint8_t kTypeInt = 0x30; +const uint8_t kTypeFloat = 0x40; +const uint8_t kTypeStr = 0x50; +const uint8_t kTypeList = 0x60; +const uint8_t kTypeDict = 0x70; + +void GetData(int fd, uint8_t *data, uint32_t size) { + int ret = 0; + uint32_t got = 0; + while (got < size) { + ret = read(fd, data + got, size - got); + if (ret > 0) { + got += ret; + } else if (ret == 0) { + throw TransformExecutionException( + "The communication pipe from the transform process was closed!"); + } else if (errno != EINTR) { + throw TransformExecutionException( + "Couldn't get data from the transform process!"); + } + } +} + +uint32_t GetSize(int fd) { + uint32_t size = 0; + GetData(fd, reinterpret_cast(&size), sizeof(size)); + return size; +} + +void GetString(int fd, std::string *value) { + const int kMaxStackBuffer = 8192; + uint8_t buffer[kMaxStackBuffer]; + uint32_t size = GetSize(fd); + if (size < kMaxStackBuffer) { + GetData(fd, buffer, size); + *value = std::string(reinterpret_cast(buffer), size); + } else { + std::unique_ptr tmp(new uint8_t[size]); + GetData(fd, tmp.get(), size); + *value = std::string(reinterpret_cast(tmp.get()), size); + } +} + +void GetValue(int fd, Value *value) { + uint8_t type = 0; + GetData(fd, &type, sizeof(type)); + if (type == kTypeNone) { + *value = Value(); + } else if (type == kTypeBoolFalse) { + *value = Value(false); + } else if (type == kTypeBoolTrue) { + *value = Value(true); + } else if (type == kTypeInt) { + int64_t tmp = 0; + GetData(fd, reinterpret_cast(&tmp), sizeof(tmp)); + *value = Value(tmp); + } else if (type == kTypeFloat) { + double tmp = 0.0; + GetData(fd, reinterpret_cast(&tmp), sizeof(tmp)); + *value = Value(tmp); + } else if (type == kTypeStr) { + std::string tmp; + GetString(fd, &tmp); + *value = Value(tmp); + } else if (type == kTypeList) { + std::vector tmp_vec; + uint32_t size = GetSize(fd); + tmp_vec.reserve(size); + for (uint32_t i = 0; i < size; ++i) { + Value tmp_value; + GetValue(fd, &tmp_value); + tmp_vec.push_back(tmp_value); + } + *value = Value(tmp_vec); + } else if (type == kTypeDict) { + std::map tmp_map; + uint32_t size = GetSize(fd); + for (uint32_t i = 0; i < size; ++i) { + std::string tmp_key; + Value tmp_value; + GetString(fd, &tmp_key); + GetValue(fd, &tmp_value); + tmp_map.insert({tmp_key, tmp_value}); + } + *value = Value(tmp_map); + } else { + throw TransformExecutionException( + fmt::format("Couldn't get value of unsupported type 0x{:02x}!", type)); + } +} + +} // namespace + +namespace integrations::kafka { Transform::Transform(const std::string &transform_script_path) : transform_script_path_(transform_script_path) {} -std::vector Transform::Apply( - const std::vector> &batch) { - // TODO (msantl): dummy transform, do the actual transform later @mferencevic - std::vector transformed_batch; - transformed_batch.reserve(batch.size()); - for (auto &record : batch) { - transformed_batch.push_back(reinterpret_cast(record->payload())); +bool Transform::Start() { + // Setup communication pipes. + if (pipe2(pipe_to_python_, O_CLOEXEC) != 0) { + DLOG(ERROR) << "Couldn't create communication pipe from cpp to python!"; + return false; + } + if (pipe2(pipe_from_python_, O_CLOEXEC) != 0) { + DLOG(ERROR) << "Couldn't create communication pipe from python to cpp!"; + return false; } - return transformed_batch; + // Find the top of the stack. + uint8_t *stack_top = stack_.get() + kStackSizeBytes; + + // Set the target arguments. + target_arguments_->transform_script_path = transform_script_path_; + target_arguments_->pipe_to_python = pipe_to_python_[kPipeReadEnd]; + target_arguments_->pipe_from_python = pipe_from_python_[kPipeWriteEnd]; + + // Create the process. + pid_ = clone(Target, stack_top, CLONE_VFORK, target_arguments_.get()); + if (pid_ == -1) { + DLOG(ERROR) << "Couldn't create the communication process!"; + return false; + } + + // Close pipes that won't be used from the master process. + close(pipe_to_python_[kPipeReadEnd]); + close(pipe_from_python_[kPipeWriteEnd]); + + return true; } -} // namespace kafka -} // namespace integrations +void Transform::Apply( + const std::vector> &batch, + std::function &)> + query_function) { + // Check that the process is alive. + if (waitpid(pid_, &status_, WNOHANG | WUNTRACED) != 0) { + throw TransformExecutionException("The transform process has died!"); + } + + // Put the `batch` data to the transform process. + PutSize(pipe_to_python_[kPipeWriteEnd], batch.size()); + for (const auto &item : batch) { + PutSize(pipe_to_python_[kPipeWriteEnd], item->len()); + PutData(pipe_to_python_[kPipeWriteEnd], + reinterpret_cast(item->payload()), item->len()); + } + + // Get `query` and `params` data from the transfrom process. + uint32_t size = GetSize(pipe_from_python_[kPipeReadEnd]); + for (uint32_t i = 0; i < size; ++i) { + std::string query; + Value params; + GetString(pipe_from_python_[kPipeReadEnd], &query); + GetValue(pipe_from_python_[kPipeReadEnd], ¶ms); + query_function(query, params.ValueMap()); + } +} + +Transform::~Transform() { + // Try to terminate the process gracefully in `kTerminateTimeoutSec`. + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + for (int i = 0; i < kTerminateTimeoutSec * 10; ++i) { + DLOG(INFO) << "Terminating the transform process with pid " << pid_; + kill(pid_, SIGTERM); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + int ret = waitpid(pid_, &status_, WNOHANG | WUNTRACED); + if (ret == pid_ || ret == -1) { + break; + } + } + + // If the process is still alive, kill it and wait for it to die. + if (waitpid(pid_, &status_, WNOHANG | WUNTRACED) == 0) { + DLOG(WARNING) << "Killing the transform process with pid " << pid_; + kill(pid_, SIGKILL); + waitpid(pid_, &status_, 0); + } + + // Delete the working directory. + if (pid_ != -1) { + utils::DeleteDir(GetTemporaryPath(pid_)); + } + + // Close leftover open pipes. + // We have to be careful to close only the leftover open pipes (the + // pipe_to_python WriteEnd and pipe_from_python ReadEnd), the other two ends + // were closed in the function that created them because they aren't used from + // the master process (they are only used from the Python process). + close(pipe_to_python_[kPipeWriteEnd]); + close(pipe_from_python_[kPipeReadEnd]); +} + +} // namespace integrations::kafka diff --git a/src/integrations/kafka/transform.hpp b/src/integrations/kafka/transform.hpp index 6a632ef72..57012373c 100644 --- a/src/integrations/kafka/transform.hpp +++ b/src/integrations/kafka/transform.hpp @@ -1,26 +1,48 @@ #pragma once -#include +#include +#include #include -#include #include "rdkafkacpp.h" -namespace integrations { -namespace kafka { +#include "communication/bolt/v1/value.hpp" + +namespace integrations::kafka { + +struct TargetArguments { + std::experimental::filesystem::path transform_script_path; + int pipe_to_python{-1}; + int pipe_from_python{-1}; +}; class Transform final { + private: + const int kStackSizeBytes = 262144; + public: - Transform(const std::string &transform_script_path); + explicit Transform(const std::string &transform_script_path); - std::vector Apply( - const std::vector> &batch); + bool Start(); - auto transform_script_path() const { return transform_script_path_; } + void Apply(const std::vector> &batch, + std::function &)> + query_function); + + ~Transform(); private: std::string transform_script_path_; + pid_t pid_{-1}; + int status_{0}; + // The stack used for the `clone` system call must be heap allocated. + std::unique_ptr stack_{new uint8_t[kStackSizeBytes]}; + // The target arguments passed to the new process must be heap allocated. + std::unique_ptr target_arguments_{new TargetArguments()}; + int pipe_to_python_[2] = {-1, -1}; + int pipe_from_python_[2] = {-1, -1}; }; -} // namespace kafka -} // namespace integrations +} // namespace integrations::kafka diff --git a/src/memgraph_bolt.cpp b/src/memgraph_bolt.cpp index 23edd25be..62a969a4f 100644 --- a/src/memgraph_bolt.cpp +++ b/src/memgraph_bolt.cpp @@ -279,17 +279,22 @@ void SingleNodeMain() { SessionData session_data{db}; auto stream_writer = - [&session_data](const std::vector &queries) { - for (auto &query : queries) { - auto dba = session_data.db.Access(); - KafkaResultStream stream; - try { - session_data.interpreter(query, *dba, {}, false).PullAll(stream); - dba->Commit(); - } catch (const query::QueryException &e) { - LOG(ERROR) << e.what(); - dba->Abort(); - } + [&session_data]( + const std::string &query, + const std::map ¶ms) { + auto dba = session_data.db.Access(); + KafkaResultStream stream; + std::map params_tv; + for (const auto &kv : params) + params_tv.emplace(kv.first, glue::ToTypedValue(kv.second)); + try { + session_data.interpreter(query, *dba, params_tv, false) + .PullAll(stream); + dba->Commit(); + } catch (const query::QueryException &e) { + LOG(WARNING) << "[Kafka] query execution failed with an exception: " + << e.what(); + dba->Abort(); } }; @@ -370,17 +375,22 @@ void MasterMain() { SessionData session_data{db}; auto stream_writer = - [&session_data](const std::vector &queries) { - for (auto &query : queries) { - auto dba = session_data.db.Access(); - KafkaResultStream stream; - try { - session_data.interpreter(query, *dba, {}, false).PullAll(stream); - dba->Commit(); - } catch (const query::QueryException &e) { - LOG(ERROR) << e.what(); - dba->Abort(); - } + [&session_data]( + const std::string &query, + const std::map ¶ms) { + auto dba = session_data.db.Access(); + KafkaResultStream stream; + std::map params_tv; + for (const auto &kv : params) + params_tv.emplace(kv.first, glue::ToTypedValue(kv.second)); + try { + session_data.interpreter(query, *dba, params_tv, false) + .PullAll(stream); + dba->Commit(); + } catch (const query::QueryException &e) { + LOG(WARNING) << "[Kafka] query execution failed with an exception: " + << e.what(); + dba->Abort(); } }; diff --git a/src/query/plan/operator.cpp b/src/query/plan/operator.cpp index df840cb85..11c029b6c 100644 --- a/src/query/plan/operator.cpp +++ b/src/query/plan/operator.cpp @@ -21,6 +21,7 @@ #include "distributed/pull_rpc_clients.hpp" #include "distributed/updates_rpc_clients.hpp" #include "distributed/updates_rpc_server.hpp" +#include "glue/conversion.hpp" #include "integrations/kafka/exceptions.hpp" #include "integrations/kafka/streams.hpp" #include "query/context.hpp" @@ -4152,7 +4153,7 @@ class ShowStreamsCursor : public Cursor { frame[self_.uri_symbol()] = streams_it_->stream_uri; frame[self_.topic_symbol()] = streams_it_->stream_topic; frame[self_.transform_symbol()] = streams_it_->transform_uri; - frame[self_.status_symbol()] = streams_it_->is_running; + frame[self_.status_symbol()] = streams_it_->stream_status; streams_it_++; @@ -4165,9 +4166,9 @@ class ShowStreamsCursor : public Cursor { const ShowStreams &self_; bool is_initialized_ = false; - using StreamInfo = integrations::kafka::StreamInfo; - std::vector streams_; - std::vector::iterator streams_it_ = streams_.begin(); + using StreamStatus = integrations::kafka::StreamStatus; + std::vector streams_; + std::vector::iterator streams_it_ = streams_.begin(); }; std::unique_ptr ShowStreams::MakeCursor( @@ -4267,15 +4268,16 @@ std::unique_ptr StartStopAllStreams::MakeCursor( } TestStream::TestStream(std::string stream_name, Expression *limit_batches, - Symbol test_result_symbol) + Symbol query_symbol, Symbol params_symbol) : stream_name_(stream_name), limit_batches_(limit_batches), - test_result_symbol_(test_result_symbol) {} + query_symbol_(query_symbol), + params_symbol_(params_symbol) {} WITHOUT_SINGLE_INPUT(TestStream) std::vector TestStream::OutputSymbols(const SymbolTable &) const { - return {test_result_symbol_}; + return {query_symbol_, params_symbol_}; } class TestStreamCursor : public Cursor { @@ -4298,7 +4300,15 @@ class TestStreamCursor : public Cursor { } try { - results_ = ctx.kafka_streams_->Test(self_.stream_name(), limit_batches); + auto results = + ctx.kafka_streams_->Test(self_.stream_name(), limit_batches); + for (const auto &result : results) { + std::map params_tv; + for (const auto &kv : result.second) { + params_tv.emplace(kv.first, glue::ToTypedValue(kv.second)); + } + results_.emplace_back(result.first, params_tv); + } } catch (const integrations::kafka::KafkaStreamException &e) { throw QueryRuntimeException(e.what()); } @@ -4308,7 +4318,8 @@ class TestStreamCursor : public Cursor { if (results_it_ == results_.end()) return false; - frame[self_.test_result_symbol()] = *results_it_; + frame[self_.query_symbol()] = results_it_->first; + frame[self_.params_symbol()] = results_it_->second; results_it_++; return true; @@ -4320,8 +4331,9 @@ class TestStreamCursor : public Cursor { const TestStream &self_; bool is_initialized_ = false; - std::vector results_; - std::vector::iterator results_it_ = results_.begin(); + std::vector> results_; + std::vector>::iterator results_it_ = + results_.begin(); }; std::unique_ptr TestStream::MakeCursor( diff --git a/src/query/plan/operator.lcp b/src/query/plan/operator.lcp index c558d9da0..e4cf8d4f9 100644 --- a/src/query/plan/operator.lcp +++ b/src/query/plan/operator.lcp @@ -2570,14 +2570,15 @@ stream is importing.") :capnp-init nil :capnp-save #'save-ast-pointer :capnp-load (load-ast-pointer "Expression *")) - (test-result-symbol "Symbol" :reader t)) + (query-symbol "Symbol" :reader t) + (params-symbol "Symbol" :reader t)) (:documentation "Test a stream. This will start consuming messages but wont insert anything in the db.") (:public #>cpp TestStream(std::string stream_name, Expression *limit_batches, - Symbol test_result_symbol); + Symbol query_symbol, Symbol params_symbol); DEFVISITABLE(HierarchicalLogicalOperatorVisitor); std::unique_ptr MakeCursor( database::GraphDbAccessor &db) const override; diff --git a/src/query/plan/rule_based_planner.hpp b/src/query/plan/rule_based_planner.hpp index d1e2c8794..65a47a668 100644 --- a/src/query/plan/rule_based_planner.hpp +++ b/src/query/plan/rule_based_planner.hpp @@ -212,7 +212,7 @@ class RuleBasedPlanner { symbol_table.CreateSymbol("uri", false), symbol_table.CreateSymbol("topic", false), symbol_table.CreateSymbol("transform", false), - symbol_table.CreateSymbol("is_running", false)); + symbol_table.CreateSymbol("status", false)); } else if (auto *start_stop_stream = dynamic_cast(clause)) { DCHECK(!input_op) << "Unexpected operator before StartStopStream"; @@ -230,7 +230,8 @@ class RuleBasedPlanner { auto &symbol_table = context.symbol_table; input_op = std::make_unique( test_stream->stream_name_, test_stream->limit_batches_, - symbol_table.CreateSymbol("test_result", false)); + symbol_table.CreateSymbol("query", false), + symbol_table.CreateSymbol("params", false)); } else { throw utils::NotYetImplemented("clause conversion to operator(s)"); } diff --git a/src/utils/file.cpp b/src/utils/file.cpp index 87b328982..427c09e3f 100644 --- a/src/utils/file.cpp +++ b/src/utils/file.cpp @@ -81,6 +81,17 @@ void CheckDir(const std::string &dir) { } } +bool DeleteDir(const fs::path &dir) { + if (!fs::exists(dir)) return true; + std::error_code error_code; // Just for exception suppression. + return fs::remove_all(dir, error_code) > 0; +} + +bool CopyFile(const fs::path &src, const fs::path &dst) { + std::error_code error_code; // Just for exception suppression. + return fs::copy_file(src, dst); +} + File::File() : fd_(-1), path_() {} File::File(int fd, fs::path path) : fd_(fd), path_(std::move(path)) {} diff --git a/src/utils/file.hpp b/src/utils/file.hpp index 7751a30f5..e1e79fa29 100644 --- a/src/utils/file.hpp +++ b/src/utils/file.hpp @@ -59,6 +59,17 @@ bool EnsureDir(const std::experimental::filesystem::path &dir); */ void CheckDir(const std::string &dir); +/** + * Deletes everything from the given dir including the dir. + */ +bool DeleteDir(const std::experimental::filesystem::path &dir); + +/** + * Copies the file from src to dst. + */ +bool CopyFile(const std::experimental::filesystem::path &src, + const std::experimental::filesystem::path &dst); + // End higher level operations. // Lower level wrappers around C system calls follow.