Implement Kafka Python transform

Summary:
The Kafka Python transform functionality uses a Python script to transform
incoming Kafka data into queries and parameters that are executed against the
database. When starting the Python transform script it is started in a
sandboxed environment so that it can't do harm to the host system or the
database.

Reviewers: msantl, teon.banek

Reviewed By: msantl

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1509
This commit is contained in:
Matej Ferencevic 2018-08-06 13:05:42 +02:00
parent 3d1e5f2ebd
commit ce306a4c21
17 changed files with 1176 additions and 109 deletions

1
init
View File

@ -6,6 +6,7 @@ required_pkgs=(git arcanist # source code control
uuid-dev default-jre-headless # required by antlr
libreadline-dev # for memgraph console
libssl-dev
libseccomp-dev
python3 python-virtualenv python3-pip # for qa, macro_benchmark and stress tests
uuid-dev # mg-utils
libcurl4-openssl-dev # mg-requests

View File

@ -3,7 +3,18 @@ set(integrations_src_files
kafka/transform.cpp
kafka/consumer.cpp)
include(FindSeccomp.cmake)
if (NOT SECCOMP_FOUND)
message(FATAL_ERROR "Couldn't find seccomp library!")
endif()
add_library(mg-integrations STATIC ${integrations_src_files})
target_link_libraries(mg-integrations stdc++fs Threads::Threads fmt
glog gflags librdkafka++ librdkafka zlib json)
target_link_libraries(mg-integrations mg-utils mg-requests)
target_link_libraries(mg-integrations mg-utils mg-requests mg-communication)
target_link_libraries(mg-integrations ${Seccomp_LIBRARIES})
target_include_directories(mg-integrations SYSTEM PUBLIC ${Seccomp_INCLUDE_DIRS})
# Copy kafka.py to the root of our build directory where memgraph executable should be
configure_file(kafka/kafka.py ${CMAKE_BINARY_DIR} COPYONLY)

View File

@ -0,0 +1,90 @@
#.rst:
# FindSeccomp
# -----------
#
# Try to locate the libseccomp library.
# If found, this will define the following variables:
#
# ``Seccomp_FOUND``
# True if the seccomp library is available
# ``Seccomp_INCLUDE_DIRS``
# The seccomp include directories
# ``Seccomp_LIBRARIES``
# The seccomp libraries for linking
#
# If ``Seccomp_FOUND`` is TRUE, it will also define the following
# imported target:
#
# ``Seccomp::Seccomp``
# The Seccomp library
#
# Since 5.44.0.
#=============================================================================
# Copyright (c) 2017 Martin Flöser <mgraesslin@kde.org>
# Copyright (c) 2017 David Kahles <david.kahles96@gmail.com>
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# 1. Redistributions of source code must retain the copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# 3. The name of the author may not be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
# IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
# OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
# IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
# NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
# THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#=============================================================================
find_package(PkgConfig QUIET)
pkg_check_modules(PKG_Libseccomp QUIET libseccomp)
find_path(Seccomp_INCLUDE_DIRS
NAMES
seccomp.h
HINTS
${PKG_Libseccomp_INCLUDE_DIRS}
)
find_library(Seccomp_LIBRARIES
NAMES
seccomp
HINTS
${PKG_Libseccomp_LIBRARY_DIRS}
)
include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(Seccomp
FOUND_VAR
Seccomp_FOUND
REQUIRED_VARS
Seccomp_LIBRARIES
Seccomp_INCLUDE_DIRS
)
if (Seccomp_FOUND AND NOT TARGET Seccomp::Seccomp)
add_library(Seccomp::Seccomp UNKNOWN IMPORTED)
set_target_properties(Seccomp::Seccomp PROPERTIES
IMPORTED_LOCATION "${Seccomp_LIBRARIES}"
INTERFACE_INCLUDE_DIRECTORIES "${Seccomp_INCLUDE_DIRS}"
)
endif()
mark_as_advanced(Seccomp_LIBRARIES Seccomp_INCLUDE_DIRS)
include(FeatureSummary)
set_package_properties(Seccomp PROPERTIES
URL "https://github.com/seccomp/libseccomp"
DESCRIPTION "The enhanced seccomp library."
)

View File

@ -7,9 +7,9 @@
#include "integrations/kafka/exceptions.hpp"
#include "utils/on_scope_exit.hpp"
#include "utils/thread.hpp"
namespace integrations {
namespace kafka {
namespace integrations::kafka {
using namespace std::chrono_literals;
@ -30,10 +30,13 @@ void Consumer::event_cb(RdKafka::Event &event) {
Consumer::Consumer(
const StreamInfo &info, const std::string &transform_script_path,
std::function<void(const std::vector<std::string> &)> stream_writer)
std::function<
void(const std::string &,
const std::map<std::string, communication::bolt::Value> &)>
stream_writer)
: info_(info),
stream_writer_(stream_writer),
transform_(transform_script_path) {
transform_script_path_(transform_script_path),
stream_writer_(stream_writer) {
std::unique_ptr<RdKafka::Conf> conf(
RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL));
std::string error;
@ -112,13 +115,41 @@ void Consumer::StartConsuming(
is_running_.store(true);
thread_ = std::thread([this, limit_batches]() {
utils::ThreadSetName("StreamKafka");
int64_t batch_count = 0;
Transform transform(transform_script_path_);
transform_alive_.store(false);
if (!transform.Start()) {
LOG(WARNING) << "[Kafka] stream " << info_.stream_name
<< " couldn't start the transform script!";
return;
}
transform_alive_.store(true);
while (is_running_) {
// TODO (msantl): Figure out what to do with potential exceptions here.
auto batch = this->GetBatch();
auto transformed_batch = transform_.Apply(batch);
stream_writer_(transformed_batch);
if (batch.empty()) continue;
// All exceptions that could be possibly thrown by the `Apply` function
// must be handled here because they *will* crash the database if
// uncaught!
// TODO (mferencevic): Figure out what to do with all other exceptions.
try {
transform.Apply(batch, stream_writer_);
} catch (const TransformExecutionException) {
LOG(WARNING) << "[Kafka] stream " << info_.stream_name
<< " the transform process has died!";
break;
} catch (const utils::BasicException &e) {
LOG(WARNING) << "[Kafka] stream " << info_.stream_name
<< " the transform process received an exception: "
<< e.what();
break;
}
if (limit_batches != std::experimental::nullopt) {
if (limit_batches <= ++batch_count) {
@ -127,6 +158,8 @@ void Consumer::StartConsuming(
}
}
}
transform_alive_.store(false);
});
}
@ -153,7 +186,8 @@ std::vector<std::unique_ptr<RdKafka::Message>> Consumer::GetBatch() {
break;
default:
LOG(ERROR) << "[Kafka] Consumer error: " << msg->errstr();
LOG(WARNING) << "[Kafka] stream " << info_.stream_name
<< " consumer error: " << msg->errstr();
run_batch = false;
is_running_.store(false);
break;
@ -217,8 +251,10 @@ void Consumer::StopIfRunning() {
}
}
std::vector<std::string> Consumer::Test(
std::experimental::optional<int64_t> limit_batches) {
std::vector<
std::pair<std::string, std::map<std::string, communication::bolt::Value>>>
Consumer::Test(std::experimental::optional<int64_t> limit_batches) {
// All exceptions thrown here are handled by the Bolt protocol.
if (!consumer_) {
throw ConsumerNotAvailableException(info_.stream_name);
}
@ -227,29 +263,71 @@ std::vector<std::string> Consumer::Test(
throw ConsumerRunningException(info_.stream_name);
}
Transform transform(transform_script_path_);
int64_t num_of_batches = limit_batches.value_or(kDefaultTestBatchLimit);
std::vector<std::string> results;
std::vector<
std::pair<std::string, std::map<std::string, communication::bolt::Value>>>
results;
is_running_.store(true);
utils::OnScopeExit cleanup([this]() { is_running_.store(false); });
transform_alive_.store(false);
if (!transform.Start()) {
LOG(WARNING) << "[Kafka] stream " << info_.stream_name
<< " couldn't start the transform script!";
throw TransformExecutionException("Couldn't start the transform script!");
}
transform_alive_.store(true);
for (int64_t i = 0; i < num_of_batches; ++i) {
auto batch = GetBatch();
auto transformed_batch = transform_.Apply(batch);
for (auto &record : transformed_batch) {
results.emplace_back(std::move(record));
// Exceptions thrown by `Apply` are handled in Bolt.
// Wrap the `TransformExecutionException` into a new exception with a
// message that isn't so specific so the user doesn't get confused.
try {
transform.Apply(
batch,
[&results](
const std::string &query,
const std::map<std::string, communication::bolt::Value> &params) {
results.push_back({query, params});
});
} catch (const TransformExecutionException) {
LOG(WARNING) << "[Kafka] stream " << info_.stream_name
<< " the transform process has died!";
throw TransformExecutionException(
"The transform script contains a runtime error!");
}
}
transform_alive_.store(false);
return results;
}
StreamStatus Consumer::Status() {
StreamStatus ret;
ret.stream_name = info_.stream_name;
ret.stream_uri = info_.stream_uri;
ret.stream_topic = info_.stream_topic;
ret.transform_uri = info_.transform_uri;
if (!is_running_) {
ret.stream_status = "stopped";
} else if (!transform_alive_) {
ret.stream_status = "error";
} else {
ret.stream_status = "running";
}
return ret;
}
StreamInfo Consumer::info() {
info_.is_running = is_running_;
return info_;
}
} // namespace kafka
} // namespace integrations
} // namespace integrations::kafka

View File

@ -10,6 +10,7 @@
#include "rdkafkacpp.h"
#include "communication/bolt/v1/value.hpp"
#include "integrations/kafka/transform.hpp"
namespace integrations {
@ -28,12 +29,23 @@ struct StreamInfo {
bool is_running = false;
};
struct StreamStatus {
std::string stream_name;
std::string stream_uri;
std::string stream_topic;
std::string transform_uri;
std::string stream_status;
};
class Consumer final : public RdKafka::EventCb {
public:
Consumer() = delete;
Consumer(const StreamInfo &info, const std::string &transform_script_path,
std::function<void(const std::vector<std::string> &)> stream_writer);
std::function<
void(const std::string &,
const std::map<std::string, communication::bolt::Value> &)>
stream_writer);
Consumer(const Consumer &other) = delete;
Consumer(Consumer &&other) = delete;
@ -49,19 +61,23 @@ class Consumer final : public RdKafka::EventCb {
void StopIfRunning();
std::vector<std::string> Test(
std::experimental::optional<int64_t> limit_batches);
std::vector<
std::pair<std::string, std::map<std::string, communication::bolt::Value>>>
Test(std::experimental::optional<int64_t> limit_batches);
StreamStatus Status();
StreamInfo info();
private:
StreamInfo info_;
std::string transform_script_path_;
std::function<void(const std::vector<std::string> &)> stream_writer_;
Transform transform_;
std::function<void(const std::string &,
const std::map<std::string, communication::bolt::Value> &)>
stream_writer_;
std::atomic<bool> is_running_{false};
std::atomic<bool> transform_alive_{false};
std::thread thread_;
std::unique_ptr<RdKafka::KafkaConsumer,

View File

@ -4,8 +4,8 @@
#include <fmt/format.h>
namespace integrations {
namespace kafka {
namespace integrations::kafka {
class KafkaStreamException : public utils::BasicException {
using utils::BasicException::BasicException;
};
@ -117,5 +117,8 @@ class TransformScriptCouldNotBeDeletedException : public KafkaStreamException {
"Couldn't delete transform script for stream {}", stream_name)) {}
};
} // namespace kafka
} // namespace integrations
class TransformExecutionException : public KafkaStreamException {
using KafkaStreamException::KafkaStreamException;
};
} // namespace integrations::kafka

View File

@ -0,0 +1,162 @@
#!/usr/bin/python3
import os
import struct
# Import the target transform script.
import transform
# Constants used for communicating with the memgraph process.
COMMUNICATION_TO_PYTHON_FD = 1000
COMMUNICATION_FROM_PYTHON_FD = 1002
# Functions used to get data from the memgraph process.
def get_data(num_bytes):
data = bytes()
while len(data) < num_bytes:
data += os.read(COMMUNICATION_TO_PYTHON_FD, num_bytes - len(data))
return data
def get_size():
fmt = "I" # uint32_t
data = get_data(struct.calcsize(fmt))
return struct.unpack(fmt, data)[0]
def get_batch():
batch = []
count = get_size()
for i in range(count):
size = get_size()
batch.append(get_data(size))
return batch
# Functions used to put data to the memgraph process.
TYPE_NONE = 0x10
TYPE_BOOL_FALSE = 0x20
TYPE_BOOL_TRUE = 0x21
TYPE_INT = 0x30
TYPE_FLOAT = 0x40
TYPE_STR = 0x50
TYPE_LIST = 0x60
TYPE_DICT = 0x70
def put_data(data):
written = 0
while written < len(data):
written += os.write(COMMUNICATION_FROM_PYTHON_FD, data[written:])
def put_size(size):
fmt = "I" # uint32_t
put_data(struct.pack(fmt, size))
def put_type(typ):
fmt = "B" # uint8_t
put_data(struct.pack(fmt, typ))
def put_string(value):
data = value.encode("utf-8")
put_size(len(data))
put_data(data)
def put_value(value, ids):
if value is None:
put_type(TYPE_NONE)
elif type(value) is bool:
if value:
put_type(TYPE_BOOL_TRUE)
else:
put_type(TYPE_BOOL_FALSE)
elif type(value) is int:
put_type(TYPE_INT)
put_data(struct.pack("q", value)) # int64_t
elif type(value) is float:
put_type(TYPE_FLOAT)
put_data(struct.pack("d", value)) # double
elif type(value) is str:
put_type(TYPE_STR)
put_string(value)
elif type(value) is list:
if id(value) in ids:
raise ValueError("Recursive objects are not supported!")
ids_new = ids + [id(value)]
put_type(TYPE_LIST)
put_size(len(value))
for item in value:
put_value(item, ids_new)
elif type(value) is dict:
if id(value) in ids:
raise ValueError("Recursive objects are not supported!")
ids_new = ids + [id(value)]
put_type(TYPE_DICT)
put_size(len(value))
for key, item in value.items():
if type(key) is not str:
raise TypeError("Dictionary keys must be strings!")
put_string(key)
put_value(item, ids_new)
else:
raise TypeError("Unsupported value type {}!".format(str(type(value))))
# Functions used to continuously process data.
def put_params(params):
if type(params) != dict:
raise TypeError("Parameters must be a dict!")
put_value(params, [])
class StreamError(Exception):
pass
def process_batch():
# Get the data that should be transformed.
batch = get_batch()
# Transform the data.
ret = transform.stream(batch)
# Sanity checks for the transformed data.
if type(ret) != list:
raise StreamError("The transformed items must be a list!")
for item in ret:
if type(item) not in [list, tuple]:
raise StreamError("The transformed item must be a tuple "
"or a list!")
if len(item) != 2:
raise StreamError("There must be exactly two elements in the "
"transformed item!")
if type(item[0]) != str:
raise StreamError("The first transformed element of an item "
"must be a string!")
if type(item[1]) != dict:
raise StreamError("The second transformed element of an item "
"must be a dictionary!")
# Send the items to the server.
put_size(len(ret))
for query, params in ret:
put_string(query)
put_params(params)
# Main entry point.
if __name__ == "__main__":
while True:
process_batch()

View File

@ -10,8 +10,7 @@
#include "requests/requests.hpp"
#include "utils/file.hpp"
namespace integrations {
namespace kafka {
namespace integrations::kafka {
namespace fs = std::experimental::filesystem;
@ -101,9 +100,11 @@ StreamInfo Deserialize(const nlohmann::json &data) {
} // namespace
Streams::Streams(
const std::string &streams_directory,
std::function<void(const std::vector<std::string> &)> stream_writer)
Streams::Streams(const std::string &streams_directory,
std::function<void(
const std::string &,
const std::map<std::string, communication::bolt::Value> &)>
stream_writer)
: streams_directory_(streams_directory),
stream_writer_(stream_writer),
metadata_store_(fs::path(streams_directory) / kMetadataDir) {}
@ -240,19 +241,20 @@ void Streams::StopAll() {
}
}
std::vector<StreamInfo> Streams::Show() {
std::vector<StreamInfo> streams;
std::vector<StreamStatus> Streams::Show() {
std::vector<StreamStatus> streams;
std::lock_guard<std::mutex> g(mutex_);
for (auto &consumer_kv : consumers_) {
streams.emplace_back(consumer_kv.second.info());
streams.emplace_back(consumer_kv.second.Status());
}
return streams;
}
std::vector<std::string> Streams::Test(
const std::string &stream_name,
std::experimental::optional<int64_t> limit_batches) {
std::vector<
std::pair<std::string, std::map<std::string, communication::bolt::Value>>>
Streams::Test(const std::string &stream_name,
std::experimental::optional<int64_t> limit_batches) {
std::lock_guard<std::mutex> g(mutex_);
auto find_it = consumers_.find(stream_name);
if (find_it == consumers_.end())
@ -269,5 +271,4 @@ std::string Streams::GetTransformScriptPath(const std::string &stream_name) {
return fs::path(GetTransformScriptDir()) / (stream_name + kTransformExt);
}
} // namespace kafka
} // namespace integrations
} // namespace integrations::kafka

View File

@ -8,13 +8,15 @@
#include "storage/kvstore.hpp"
namespace integrations {
namespace kafka {
namespace integrations::kafka {
class Streams final {
public:
Streams(const std::string &streams_directory,
std::function<void(const std::vector<std::string> &)> stream_writer);
std::function<
void(const std::string &,
const std::map<std::string, communication::bolt::Value> &)>
stream_writer);
void Recover();
@ -32,15 +34,19 @@ class Streams final {
void StopAll();
std::vector<StreamInfo> Show();
std::vector<StreamStatus> Show();
std::vector<std::string> Test(const std::string &stream_name,
std::experimental::optional<int64_t>
batch_limit = std::experimental::nullopt);
std::vector<
std::pair<std::string, std::map<std::string, communication::bolt::Value>>>
Test(const std::string &stream_name,
std::experimental::optional<int64_t> batch_limit =
std::experimental::nullopt);
private:
std::string streams_directory_;
std::function<void(const std::vector<std::string> &)> stream_writer_;
std::function<void(const std::string &,
const std::map<std::string, communication::bolt::Value> &)>
stream_writer_;
storage::KVStore metadata_store_;
@ -51,5 +57,4 @@ class Streams final {
std::string GetTransformScriptPath(const std::string &stream_name);
};
} // namespace kafka
} // namespace integrations
} // namespace integrations::kafka

View File

@ -1,22 +1,654 @@
#include "integrations/kafka/transform.hpp"
namespace integrations {
namespace kafka {
#include <chrono>
#include <cstdlib>
#include <cstring>
#include <thread>
#include <errno.h>
#include <fcntl.h>
#include <libgen.h>
#include <linux/limits.h>
#include <pwd.h>
#include <sched.h>
#include <seccomp.h>
#include <signal.h>
#include <sys/resource.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include <fmt/format.h>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "communication/bolt/v1/value.hpp"
#include "integrations/kafka/exceptions.hpp"
#include "utils/file.hpp"
DEFINE_string(python_interpreter, "/usr/bin/python3",
"Path to the Python 3.x interpreter that should be used");
namespace {
///////////////////////
// Namespace shortcuts.
///////////////////////
using communication::bolt::Value;
using integrations::kafka::TargetArguments;
using integrations::kafka::TransformExecutionException;
namespace fs = std::experimental::filesystem;
/////////////////////////////////////////////////////////////////////////
// Constants used for starting and communicating with the target process.
/////////////////////////////////////////////////////////////////////////
const int kPipeReadEnd = 0;
const int kPipeWriteEnd = 1;
const int kCommunicationToPythonFd = 1000;
const int kCommunicationFromPythonFd = 1002;
const int kTerminateTimeoutSec = 5;
const std::string kHelperScriptName = "kafka.py";
const std::string kTransformScriptName = "transform.py";
////////////////////
// Helper functions.
////////////////////
fs::path GetTemporaryPath(pid_t pid) {
return fs::temp_directory_path() / "memgraph" /
fmt::format("transform_{}", pid);
}
fs::path GetHelperScriptPath() {
char path[PATH_MAX];
memset(path, 0, PATH_MAX);
auto ret = readlink("/proc/self/exe", path, PATH_MAX);
if (ret < 0) return "";
return fs::path() / std::string(dirname(path)) / kHelperScriptName;
}
std::string GetEnvironmentVariable(const std::string &name) {
char *value = secure_getenv(name.c_str());
if (value == nullptr) return "";
return {value};
}
///////////////////////////////////////////
// char** wrapper used for C library calls.
///////////////////////////////////////////
const int kCharppMaxElements = 20;
class CharPP final {
public:
CharPP() { memset(data_, 0, sizeof(char *) * kCharppMaxElements); }
~CharPP() {
for (size_t i = 0; i < size_; ++i) {
free(data_[i]);
}
}
CharPP(const CharPP &) = delete;
CharPP(CharPP &&) = delete;
CharPP &operator=(const CharPP &) = delete;
CharPP &operator=(CharPP &&) = delete;
void Add(const char *value) {
if (size_ == kCharppMaxElements) return;
int len = strlen(value);
char *item = (char *)malloc(sizeof(char) * (len + 1));
if (item == nullptr) return;
memcpy(item, value, len);
item[len] = 0;
data_[size_++] = item;
}
void Add(const std::string &value) { Add(value.c_str()); }
char **Get() { return data_; }
private:
char *data_[kCharppMaxElements];
size_t size_{0};
};
////////////////////////////////////
// Security functions and constants.
////////////////////////////////////
const std::vector<int> seccomp_syscalls_allowed = {
SCMP_SYS(read),
SCMP_SYS(write),
SCMP_SYS(close),
SCMP_SYS(stat),
SCMP_SYS(fstat),
SCMP_SYS(lstat),
SCMP_SYS(poll),
SCMP_SYS(lseek),
SCMP_SYS(mmap),
SCMP_SYS(mprotect),
SCMP_SYS(munmap),
SCMP_SYS(brk),
SCMP_SYS(rt_sigaction),
SCMP_SYS(rt_sigprocmask),
SCMP_SYS(rt_sigreturn),
SCMP_SYS(ioctl),
SCMP_SYS(pread64),
SCMP_SYS(pwrite64),
SCMP_SYS(readv),
SCMP_SYS(writev),
SCMP_SYS(access),
SCMP_SYS(select),
SCMP_SYS(mremap),
SCMP_SYS(msync),
SCMP_SYS(mincore),
SCMP_SYS(madvise),
SCMP_SYS(dup),
SCMP_SYS(dup2),
SCMP_SYS(pause),
SCMP_SYS(nanosleep),
SCMP_SYS(getpid),
SCMP_SYS(sendfile),
SCMP_SYS(execve),
SCMP_SYS(exit),
SCMP_SYS(uname),
SCMP_SYS(fcntl),
SCMP_SYS(fsync),
SCMP_SYS(fdatasync),
SCMP_SYS(getdents),
SCMP_SYS(getcwd),
SCMP_SYS(readlink),
SCMP_SYS(gettimeofday),
SCMP_SYS(getrlimit),
SCMP_SYS(getrusage),
SCMP_SYS(getuid),
SCMP_SYS(getgid),
SCMP_SYS(geteuid),
SCMP_SYS(getegid),
SCMP_SYS(getppid),
SCMP_SYS(getpgrp),
SCMP_SYS(rt_sigpending),
SCMP_SYS(rt_sigtimedwait),
SCMP_SYS(rt_sigsuspend),
SCMP_SYS(sched_setparam),
SCMP_SYS(mlock),
SCMP_SYS(munlock),
SCMP_SYS(mlockall),
SCMP_SYS(munlockall),
SCMP_SYS(arch_prctl),
SCMP_SYS(ioperm),
SCMP_SYS(time),
SCMP_SYS(futex),
SCMP_SYS(set_tid_address),
SCMP_SYS(clock_gettime),
SCMP_SYS(clock_getres),
SCMP_SYS(clock_nanosleep),
SCMP_SYS(exit_group),
SCMP_SYS(mbind),
SCMP_SYS(set_mempolicy),
SCMP_SYS(get_mempolicy),
SCMP_SYS(migrate_pages),
SCMP_SYS(openat),
SCMP_SYS(pselect6),
SCMP_SYS(ppoll),
SCMP_SYS(set_robust_list),
SCMP_SYS(get_robust_list),
SCMP_SYS(tee),
SCMP_SYS(move_pages),
SCMP_SYS(dup3),
SCMP_SYS(preadv),
SCMP_SYS(pwritev),
SCMP_SYS(getrandom),
SCMP_SYS(sigaltstack),
SCMP_SYS(gettid),
SCMP_SYS(tgkill),
SCMP_SYS(sysinfo),
};
bool SetupSeccomp() {
// Initialize the seccomp context.
scmp_filter_ctx ctx;
ctx = seccomp_init(SCMP_ACT_TRAP);
if (ctx == NULL) return false;
// First we deny access to the `open` system call called with `O_WRONLY`,
// `O_RDWR` and `O_CREAT`.
if (seccomp_rule_add(ctx, SCMP_ACT_KILL, SCMP_SYS(open), 1,
SCMP_A1(SCMP_CMP_MASKED_EQ, O_WRONLY, O_WRONLY)) != 0) {
seccomp_release(ctx);
return false;
}
if (seccomp_rule_add(ctx, SCMP_ACT_KILL, SCMP_SYS(open), 1,
SCMP_A1(SCMP_CMP_MASKED_EQ, O_RDWR, O_RDWR)) != 0) {
seccomp_release(ctx);
return false;
}
if (seccomp_rule_add(ctx, SCMP_ACT_KILL, SCMP_SYS(open), 1,
SCMP_A1(SCMP_CMP_MASKED_EQ, O_CREAT, O_CREAT)) != 0) {
seccomp_release(ctx);
return false;
}
// Now we allow the `open` system call without the blocked flags.
if (seccomp_rule_add(
ctx, SCMP_ACT_ALLOW, SCMP_SYS(open), 1,
SCMP_A1(SCMP_CMP_MASKED_EQ, O_WRONLY | O_RDWR | O_CREAT, 0)) != 0) {
seccomp_release(ctx);
return false;
}
// Add all general allow rules.
for (auto syscall_num : seccomp_syscalls_allowed) {
if (seccomp_rule_add(ctx, SCMP_ACT_ALLOW, syscall_num, 0) != 0) {
seccomp_release(ctx);
return false;
}
}
// Load the context for the current process.
auto ret = seccomp_load(ctx);
// Free the context and return success/failure.
seccomp_release(ctx);
return ret == 0;
}
bool SetLimit(int resource, rlim_t n) {
struct rlimit limit;
limit.rlim_cur = limit.rlim_max = n;
return setrlimit(resource, &limit) == 0;
}
///////////////////////////////////////////////////////
// Target function used to start the transform process.
///////////////////////////////////////////////////////
int Target(void *arg) {
// NOTE: (D)LOG shouldn't be used here because it wasn't initialized in this
// process and something really bad could happen.
// Get a pointer to the passed arguments.
TargetArguments *ta = reinterpret_cast<TargetArguments *>(arg);
// Redirect `stdin` to `/dev/null`.
int fd = open("/dev/null", O_RDONLY | O_CLOEXEC);
if (fd == -1) {
return EXIT_FAILURE;
}
if (dup2(fd, STDIN_FILENO) != STDIN_FILENO) {
return EXIT_FAILURE;
}
// Redirect `stdout` to `/dev/null`.
fd = open("/dev/null", O_WRONLY | O_CLOEXEC);
if (fd == -1) {
return EXIT_FAILURE;
}
if (dup2(fd, STDOUT_FILENO) != STDOUT_FILENO) {
return EXIT_FAILURE;
}
// Redirect `stderr` to `/dev/null`.
fd = open("/dev/null", O_WRONLY | O_CLOEXEC);
if (fd == -1) {
return EXIT_FAILURE;
}
if (dup2(fd, STDERR_FILENO) != STDERR_FILENO) {
return EXIT_FAILURE;
}
// Create the working directory.
fs::path working_path = GetTemporaryPath(getpid());
utils::DeleteDir(working_path);
if (!utils::EnsureDir(working_path)) {
return EXIT_FAILURE;
}
// Copy all scripts to the working directory.
if (!utils::CopyFile(GetHelperScriptPath(),
working_path / kHelperScriptName)) {
return EXIT_FAILURE;
}
if (!utils::CopyFile(ta->transform_script_path,
working_path / kTransformScriptName)) {
return EXIT_FAILURE;
}
// Change the current directory to the working directory.
if (chdir(working_path.c_str()) != 0) {
return EXIT_FAILURE;
}
// Create the executable CharPP object.
CharPP exe;
exe.Add(FLAGS_python_interpreter);
exe.Add(kHelperScriptName);
// Create the environment CharPP object.
CharPP env;
env.Add(fmt::format("PATH={}", GetEnvironmentVariable("PATH")));
// TODO (mferencevic): Change this to the effective user.
env.Add(fmt::format("USER={}", GetEnvironmentVariable("USER")));
env.Add(fmt::format("HOME={}", working_path));
env.Add("LANG=en_US.utf8");
env.Add("LANGUAGE=en_US:en");
env.Add("PYTHONUNBUFFERED=1");
env.Add("PYTHONIOENCODING=utf-8");
env.Add("PYTHONDONTWRITEBYTECODE=1");
// Connect the communication input pipe.
if (dup2(ta->pipe_to_python, kCommunicationToPythonFd) !=
kCommunicationToPythonFd) {
return EXIT_FAILURE;
}
// Connect the communication output pipe.
if (dup2(ta->pipe_from_python, kCommunicationFromPythonFd) !=
kCommunicationFromPythonFd) {
return EXIT_FAILURE;
}
// Set process limits.
// Disable core dumps.
if (!SetLimit(RLIMIT_CORE, 0)) {
return EXIT_FAILURE;
}
// Disable file creation.
if (!SetLimit(RLIMIT_FSIZE, 0)) {
return EXIT_FAILURE;
}
// Set process number limit.
if (!SetLimit(RLIMIT_NPROC, 0)) {
return EXIT_FAILURE;
}
// TODO (mferencevic): Change the user to `nobody`.
// Setup seccomp.
if (!SetupSeccomp()) {
return EXIT_FAILURE;
}
execve(*exe.Get(), exe.Get(), env.Get());
// TODO (mferencevic): Log an error with `errno` about what failed.
return EXIT_FAILURE;
}
/////////////////////////////////////////////////////////////
// Functions used to send data to the started Python process.
/////////////////////////////////////////////////////////////
/// The data that is being sent to the Python process is always a
/// `std::vector<uint8_t[]>` of data. It is sent in the following way:
///
/// uint32_t number of elements being sent
/// uint32_t element 0 size
/// uint8_t[] element 0 data
/// uint32_t element 1 size
/// uint8_t[] element 1 data
/// ...
///
/// The receiving end of the protocol is implemented in `kafka.py`.
void PutData(int fd, const uint8_t *data, uint32_t size) {
int ret = 0;
uint32_t put = 0;
while (put < size) {
ret = write(fd, data + put, size - put);
if (ret > 0) {
put += ret;
} else if (ret == 0) {
throw TransformExecutionException(
"The communication pipe to the transform process was closed!");
} else if (errno != EINTR) {
throw TransformExecutionException(
"Couldn't put data to the transfrom process!");
}
}
}
void PutSize(int fd, uint32_t size) {
PutData(fd, reinterpret_cast<uint8_t *>(&size), sizeof(size));
}
//////////////////////////////////////////////////////////////
// Functions used to get data from the started Python process.
//////////////////////////////////////////////////////////////
/// The data that is being sent from the Python process is always a
/// `std::vector<std::pair<std::string, Value>>>` of data (array of pairs of
/// query and params). It is sent in the following way:
///
/// uint32_t number of elements being sent
/// uint32_t element 0 query size
/// char[] element 0 query data
/// data[] element 0 params
/// uint32_t element 1 query size
/// char[] element 1 query data
/// data[] element 1 params
/// ...
///
/// When sending the query parameters they have to be further encoded to enable
/// sending of None, Bool, Int, Float, Str, List and Dict objects. The encoding
/// is as follows:
///
/// None: uint8_t type (kTypeNone)
/// Bool: uint8_t type (kTypeBoolFalse or kTypeBoolTrue)
/// Int: uint8_t type (kTypeInt), int64_t value
/// Float: uint8_t type (kTypeFloat), double value
/// Str: uint8_t type (kTypeStr), uint32_t size, char[] data
/// List: uint8_t type (kTypeList), uint32_t size, data[] element 0,
/// data[] element 1, ...
/// Dict: uint8_t type (kTypeDict), uint32_t size, uint32_t element 0 key size,
/// char[] element 0 key data, data[] element 0 value,
/// uint32_t element 1 key size, char[] element 1 key data,
/// data[] element 1 value, ...
///
/// The sending end of the protocol is implemented in `kafka.py`.
const uint8_t kTypeNone = 0x10;
const uint8_t kTypeBoolFalse = 0x20;
const uint8_t kTypeBoolTrue = 0x21;
const uint8_t kTypeInt = 0x30;
const uint8_t kTypeFloat = 0x40;
const uint8_t kTypeStr = 0x50;
const uint8_t kTypeList = 0x60;
const uint8_t kTypeDict = 0x70;
void GetData(int fd, uint8_t *data, uint32_t size) {
int ret = 0;
uint32_t got = 0;
while (got < size) {
ret = read(fd, data + got, size - got);
if (ret > 0) {
got += ret;
} else if (ret == 0) {
throw TransformExecutionException(
"The communication pipe from the transform process was closed!");
} else if (errno != EINTR) {
throw TransformExecutionException(
"Couldn't get data from the transform process!");
}
}
}
uint32_t GetSize(int fd) {
uint32_t size = 0;
GetData(fd, reinterpret_cast<uint8_t *>(&size), sizeof(size));
return size;
}
void GetString(int fd, std::string *value) {
const int kMaxStackBuffer = 8192;
uint8_t buffer[kMaxStackBuffer];
uint32_t size = GetSize(fd);
if (size < kMaxStackBuffer) {
GetData(fd, buffer, size);
*value = std::string(reinterpret_cast<char *>(buffer), size);
} else {
std::unique_ptr<uint8_t[]> tmp(new uint8_t[size]);
GetData(fd, tmp.get(), size);
*value = std::string(reinterpret_cast<char *>(tmp.get()), size);
}
}
void GetValue(int fd, Value *value) {
uint8_t type = 0;
GetData(fd, &type, sizeof(type));
if (type == kTypeNone) {
*value = Value();
} else if (type == kTypeBoolFalse) {
*value = Value(false);
} else if (type == kTypeBoolTrue) {
*value = Value(true);
} else if (type == kTypeInt) {
int64_t tmp = 0;
GetData(fd, reinterpret_cast<uint8_t *>(&tmp), sizeof(tmp));
*value = Value(tmp);
} else if (type == kTypeFloat) {
double tmp = 0.0;
GetData(fd, reinterpret_cast<uint8_t *>(&tmp), sizeof(tmp));
*value = Value(tmp);
} else if (type == kTypeStr) {
std::string tmp;
GetString(fd, &tmp);
*value = Value(tmp);
} else if (type == kTypeList) {
std::vector<Value> tmp_vec;
uint32_t size = GetSize(fd);
tmp_vec.reserve(size);
for (uint32_t i = 0; i < size; ++i) {
Value tmp_value;
GetValue(fd, &tmp_value);
tmp_vec.push_back(tmp_value);
}
*value = Value(tmp_vec);
} else if (type == kTypeDict) {
std::map<std::string, Value> tmp_map;
uint32_t size = GetSize(fd);
for (uint32_t i = 0; i < size; ++i) {
std::string tmp_key;
Value tmp_value;
GetString(fd, &tmp_key);
GetValue(fd, &tmp_value);
tmp_map.insert({tmp_key, tmp_value});
}
*value = Value(tmp_map);
} else {
throw TransformExecutionException(
fmt::format("Couldn't get value of unsupported type 0x{:02x}!", type));
}
}
} // namespace
namespace integrations::kafka {
Transform::Transform(const std::string &transform_script_path)
: transform_script_path_(transform_script_path) {}
std::vector<std::string> Transform::Apply(
const std::vector<std::unique_ptr<RdKafka::Message>> &batch) {
// TODO (msantl): dummy transform, do the actual transform later @mferencevic
std::vector<std::string> transformed_batch;
transformed_batch.reserve(batch.size());
for (auto &record : batch) {
transformed_batch.push_back(reinterpret_cast<char *>(record->payload()));
bool Transform::Start() {
// Setup communication pipes.
if (pipe2(pipe_to_python_, O_CLOEXEC) != 0) {
DLOG(ERROR) << "Couldn't create communication pipe from cpp to python!";
return false;
}
if (pipe2(pipe_from_python_, O_CLOEXEC) != 0) {
DLOG(ERROR) << "Couldn't create communication pipe from python to cpp!";
return false;
}
return transformed_batch;
// Find the top of the stack.
uint8_t *stack_top = stack_.get() + kStackSizeBytes;
// Set the target arguments.
target_arguments_->transform_script_path = transform_script_path_;
target_arguments_->pipe_to_python = pipe_to_python_[kPipeReadEnd];
target_arguments_->pipe_from_python = pipe_from_python_[kPipeWriteEnd];
// Create the process.
pid_ = clone(Target, stack_top, CLONE_VFORK, target_arguments_.get());
if (pid_ == -1) {
DLOG(ERROR) << "Couldn't create the communication process!";
return false;
}
// Close pipes that won't be used from the master process.
close(pipe_to_python_[kPipeReadEnd]);
close(pipe_from_python_[kPipeWriteEnd]);
return true;
}
} // namespace kafka
} // namespace integrations
void Transform::Apply(
const std::vector<std::unique_ptr<RdKafka::Message>> &batch,
std::function<void(const std::string &,
const std::map<std::string, Value> &)>
query_function) {
// Check that the process is alive.
if (waitpid(pid_, &status_, WNOHANG | WUNTRACED) != 0) {
throw TransformExecutionException("The transform process has died!");
}
// Put the `batch` data to the transform process.
PutSize(pipe_to_python_[kPipeWriteEnd], batch.size());
for (const auto &item : batch) {
PutSize(pipe_to_python_[kPipeWriteEnd], item->len());
PutData(pipe_to_python_[kPipeWriteEnd],
reinterpret_cast<const uint8_t *>(item->payload()), item->len());
}
// Get `query` and `params` data from the transfrom process.
uint32_t size = GetSize(pipe_from_python_[kPipeReadEnd]);
for (uint32_t i = 0; i < size; ++i) {
std::string query;
Value params;
GetString(pipe_from_python_[kPipeReadEnd], &query);
GetValue(pipe_from_python_[kPipeReadEnd], &params);
query_function(query, params.ValueMap());
}
}
Transform::~Transform() {
// Try to terminate the process gracefully in `kTerminateTimeoutSec`.
std::this_thread::sleep_for(std::chrono::milliseconds(100));
for (int i = 0; i < kTerminateTimeoutSec * 10; ++i) {
DLOG(INFO) << "Terminating the transform process with pid " << pid_;
kill(pid_, SIGTERM);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
int ret = waitpid(pid_, &status_, WNOHANG | WUNTRACED);
if (ret == pid_ || ret == -1) {
break;
}
}
// If the process is still alive, kill it and wait for it to die.
if (waitpid(pid_, &status_, WNOHANG | WUNTRACED) == 0) {
DLOG(WARNING) << "Killing the transform process with pid " << pid_;
kill(pid_, SIGKILL);
waitpid(pid_, &status_, 0);
}
// Delete the working directory.
if (pid_ != -1) {
utils::DeleteDir(GetTemporaryPath(pid_));
}
// Close leftover open pipes.
// We have to be careful to close only the leftover open pipes (the
// pipe_to_python WriteEnd and pipe_from_python ReadEnd), the other two ends
// were closed in the function that created them because they aren't used from
// the master process (they are only used from the Python process).
close(pipe_to_python_[kPipeWriteEnd]);
close(pipe_from_python_[kPipeReadEnd]);
}
} // namespace integrations::kafka

View File

@ -1,26 +1,48 @@
#pragma once
#include <memory>
#include <experimental/filesystem>
#include <map>
#include <string>
#include <vector>
#include "rdkafkacpp.h"
namespace integrations {
namespace kafka {
#include "communication/bolt/v1/value.hpp"
namespace integrations::kafka {
struct TargetArguments {
std::experimental::filesystem::path transform_script_path;
int pipe_to_python{-1};
int pipe_from_python{-1};
};
class Transform final {
private:
const int kStackSizeBytes = 262144;
public:
Transform(const std::string &transform_script_path);
explicit Transform(const std::string &transform_script_path);
std::vector<std::string> Apply(
const std::vector<std::unique_ptr<RdKafka::Message>> &batch);
bool Start();
auto transform_script_path() const { return transform_script_path_; }
void Apply(const std::vector<std::unique_ptr<RdKafka::Message>> &batch,
std::function<void(
const std::string &,
const std::map<std::string, communication::bolt::Value> &)>
query_function);
~Transform();
private:
std::string transform_script_path_;
pid_t pid_{-1};
int status_{0};
// The stack used for the `clone` system call must be heap allocated.
std::unique_ptr<uint8_t[]> stack_{new uint8_t[kStackSizeBytes]};
// The target arguments passed to the new process must be heap allocated.
std::unique_ptr<TargetArguments> target_arguments_{new TargetArguments()};
int pipe_to_python_[2] = {-1, -1};
int pipe_from_python_[2] = {-1, -1};
};
} // namespace kafka
} // namespace integrations
} // namespace integrations::kafka

View File

@ -279,17 +279,22 @@ void SingleNodeMain() {
SessionData session_data{db};
auto stream_writer =
[&session_data](const std::vector<std::string> &queries) {
for (auto &query : queries) {
auto dba = session_data.db.Access();
KafkaResultStream stream;
try {
session_data.interpreter(query, *dba, {}, false).PullAll(stream);
dba->Commit();
} catch (const query::QueryException &e) {
LOG(ERROR) << e.what();
dba->Abort();
}
[&session_data](
const std::string &query,
const std::map<std::string, communication::bolt::Value> &params) {
auto dba = session_data.db.Access();
KafkaResultStream stream;
std::map<std::string, query::TypedValue> params_tv;
for (const auto &kv : params)
params_tv.emplace(kv.first, glue::ToTypedValue(kv.second));
try {
session_data.interpreter(query, *dba, params_tv, false)
.PullAll(stream);
dba->Commit();
} catch (const query::QueryException &e) {
LOG(WARNING) << "[Kafka] query execution failed with an exception: "
<< e.what();
dba->Abort();
}
};
@ -370,17 +375,22 @@ void MasterMain() {
SessionData session_data{db};
auto stream_writer =
[&session_data](const std::vector<std::string> &queries) {
for (auto &query : queries) {
auto dba = session_data.db.Access();
KafkaResultStream stream;
try {
session_data.interpreter(query, *dba, {}, false).PullAll(stream);
dba->Commit();
} catch (const query::QueryException &e) {
LOG(ERROR) << e.what();
dba->Abort();
}
[&session_data](
const std::string &query,
const std::map<std::string, communication::bolt::Value> &params) {
auto dba = session_data.db.Access();
KafkaResultStream stream;
std::map<std::string, query::TypedValue> params_tv;
for (const auto &kv : params)
params_tv.emplace(kv.first, glue::ToTypedValue(kv.second));
try {
session_data.interpreter(query, *dba, params_tv, false)
.PullAll(stream);
dba->Commit();
} catch (const query::QueryException &e) {
LOG(WARNING) << "[Kafka] query execution failed with an exception: "
<< e.what();
dba->Abort();
}
};

View File

@ -21,6 +21,7 @@
#include "distributed/pull_rpc_clients.hpp"
#include "distributed/updates_rpc_clients.hpp"
#include "distributed/updates_rpc_server.hpp"
#include "glue/conversion.hpp"
#include "integrations/kafka/exceptions.hpp"
#include "integrations/kafka/streams.hpp"
#include "query/context.hpp"
@ -4152,7 +4153,7 @@ class ShowStreamsCursor : public Cursor {
frame[self_.uri_symbol()] = streams_it_->stream_uri;
frame[self_.topic_symbol()] = streams_it_->stream_topic;
frame[self_.transform_symbol()] = streams_it_->transform_uri;
frame[self_.status_symbol()] = streams_it_->is_running;
frame[self_.status_symbol()] = streams_it_->stream_status;
streams_it_++;
@ -4165,9 +4166,9 @@ class ShowStreamsCursor : public Cursor {
const ShowStreams &self_;
bool is_initialized_ = false;
using StreamInfo = integrations::kafka::StreamInfo;
std::vector<StreamInfo> streams_;
std::vector<StreamInfo>::iterator streams_it_ = streams_.begin();
using StreamStatus = integrations::kafka::StreamStatus;
std::vector<StreamStatus> streams_;
std::vector<StreamStatus>::iterator streams_it_ = streams_.begin();
};
std::unique_ptr<Cursor> ShowStreams::MakeCursor(
@ -4267,15 +4268,16 @@ std::unique_ptr<Cursor> StartStopAllStreams::MakeCursor(
}
TestStream::TestStream(std::string stream_name, Expression *limit_batches,
Symbol test_result_symbol)
Symbol query_symbol, Symbol params_symbol)
: stream_name_(stream_name),
limit_batches_(limit_batches),
test_result_symbol_(test_result_symbol) {}
query_symbol_(query_symbol),
params_symbol_(params_symbol) {}
WITHOUT_SINGLE_INPUT(TestStream)
std::vector<Symbol> TestStream::OutputSymbols(const SymbolTable &) const {
return {test_result_symbol_};
return {query_symbol_, params_symbol_};
}
class TestStreamCursor : public Cursor {
@ -4298,7 +4300,15 @@ class TestStreamCursor : public Cursor {
}
try {
results_ = ctx.kafka_streams_->Test(self_.stream_name(), limit_batches);
auto results =
ctx.kafka_streams_->Test(self_.stream_name(), limit_batches);
for (const auto &result : results) {
std::map<std::string, query::TypedValue> params_tv;
for (const auto &kv : result.second) {
params_tv.emplace(kv.first, glue::ToTypedValue(kv.second));
}
results_.emplace_back(result.first, params_tv);
}
} catch (const integrations::kafka::KafkaStreamException &e) {
throw QueryRuntimeException(e.what());
}
@ -4308,7 +4318,8 @@ class TestStreamCursor : public Cursor {
if (results_it_ == results_.end()) return false;
frame[self_.test_result_symbol()] = *results_it_;
frame[self_.query_symbol()] = results_it_->first;
frame[self_.params_symbol()] = results_it_->second;
results_it_++;
return true;
@ -4320,8 +4331,9 @@ class TestStreamCursor : public Cursor {
const TestStream &self_;
bool is_initialized_ = false;
std::vector<std::string> results_;
std::vector<std::string>::iterator results_it_ = results_.begin();
std::vector<std::pair<std::string, TypedValue>> results_;
std::vector<std::pair<std::string, TypedValue>>::iterator results_it_ =
results_.begin();
};
std::unique_ptr<Cursor> TestStream::MakeCursor(

View File

@ -2570,14 +2570,15 @@ stream is importing.")
:capnp-init nil
:capnp-save #'save-ast-pointer
:capnp-load (load-ast-pointer "Expression *"))
(test-result-symbol "Symbol" :reader t))
(query-symbol "Symbol" :reader t)
(params-symbol "Symbol" :reader t))
(:documentation
"Test a stream. This will start consuming messages but wont insert anything
in the db.")
(:public
#>cpp
TestStream(std::string stream_name, Expression *limit_batches,
Symbol test_result_symbol);
Symbol query_symbol, Symbol params_symbol);
DEFVISITABLE(HierarchicalLogicalOperatorVisitor);
std::unique_ptr<Cursor> MakeCursor(
database::GraphDbAccessor &db) const override;

View File

@ -212,7 +212,7 @@ class RuleBasedPlanner {
symbol_table.CreateSymbol("uri", false),
symbol_table.CreateSymbol("topic", false),
symbol_table.CreateSymbol("transform", false),
symbol_table.CreateSymbol("is_running", false));
symbol_table.CreateSymbol("status", false));
} else if (auto *start_stop_stream =
dynamic_cast<query::StartStopStream *>(clause)) {
DCHECK(!input_op) << "Unexpected operator before StartStopStream";
@ -230,7 +230,8 @@ class RuleBasedPlanner {
auto &symbol_table = context.symbol_table;
input_op = std::make_unique<plan::TestStream>(
test_stream->stream_name_, test_stream->limit_batches_,
symbol_table.CreateSymbol("test_result", false));
symbol_table.CreateSymbol("query", false),
symbol_table.CreateSymbol("params", false));
} else {
throw utils::NotYetImplemented("clause conversion to operator(s)");
}

View File

@ -81,6 +81,17 @@ void CheckDir(const std::string &dir) {
}
}
bool DeleteDir(const fs::path &dir) {
if (!fs::exists(dir)) return true;
std::error_code error_code; // Just for exception suppression.
return fs::remove_all(dir, error_code) > 0;
}
bool CopyFile(const fs::path &src, const fs::path &dst) {
std::error_code error_code; // Just for exception suppression.
return fs::copy_file(src, dst);
}
File::File() : fd_(-1), path_() {}
File::File(int fd, fs::path path) : fd_(fd), path_(std::move(path)) {}

View File

@ -59,6 +59,17 @@ bool EnsureDir(const std::experimental::filesystem::path &dir);
*/
void CheckDir(const std::string &dir);
/**
* Deletes everything from the given dir including the dir.
*/
bool DeleteDir(const std::experimental::filesystem::path &dir);
/**
* Copies the file from src to dst.
*/
bool CopyFile(const std::experimental::filesystem::path &src,
const std::experimental::filesystem::path &dst);
// End higher level operations.
// Lower level wrappers around C system calls follow.