Implement kafka transform functionality

Summary:
First iteration in implementing kafka.
Currently, memgraph streams won't use the transform script provided in the
`CREATE STREAM` query.

There is a manual test that serves a POC purpose which we'll use to fully wire
kafka in memgraph.

Since streams need to download the script, I moved curl init from
telemetry.

Reviewers: teon.banek, mferencevic

Reviewed By: mferencevic

Subscribers: ipaljak, pullbot, buda

Differential Revision: https://phabricator.memgraph.io/D1491
This commit is contained in:
Matija Santl 2018-07-06 09:28:05 +02:00
parent 10b4e45166
commit 4c27596fdd
28 changed files with 632 additions and 156 deletions

3
init
View File

@ -12,7 +12,8 @@ required_pkgs=(git arcanist # source code control
libboost-iostreams-dev
libboost-serialization-dev
python3 python-virtualenv python3-pip # for qa, macro_benchmark and stress tests
uuid-dev libcurl4-openssl-dev # for telemetry
uuid-dev # mg-utils
libcurl4-openssl-dev # mg-requests
sbcl # for custom Lisp C++ preprocessing
)

View File

@ -2,6 +2,7 @@
# add memgraph sub libraries, ordered by dependency
add_subdirectory(utils)
add_subdirectory(requests)
add_subdirectory(integrations)
add_subdirectory(io)
add_subdirectory(telemetry)
@ -189,7 +190,7 @@ set(MEMGRAPH_ALL_LIBS stdc++fs Threads::Threads fmt cppitertools
antlr_opencypher_parser_lib dl glog gflags capnp kj
${Boost_IOSTREAMS_LIBRARY_RELEASE}
${Boost_SERIALIZATION_LIBRARY_RELEASE}
mg-utils mg-io mg-integrations mg-communication)
mg-utils mg-io mg-integrations mg-requests mg-communication)
if (USE_LTALLOC)
list(APPEND MEMGRAPH_ALL_LIBS ltalloc)

View File

@ -56,9 +56,6 @@ class PrivateBase : public GraphDb {
Storage &storage() override { return *storage_; }
durability::WriteAheadLog &wal() override { return wal_; }
integrations::kafka::Streams &kafka_streams() override {
return kafka_streams_;
}
int WorkerId() const override { return config_.worker_id; }
// Makes a local snapshot from the visibility of accessor
@ -101,7 +98,6 @@ class PrivateBase : public GraphDb {
durability::WriteAheadLog wal_{config_.worker_id,
config_.durability_directory,
config_.durability_enabled};
integrations::kafka::Streams kafka_streams_;
};
template <template <typename TId> class TMapper>
@ -449,9 +445,6 @@ PublicBase::~PublicBase() {
GraphDb::Type PublicBase::type() const { return impl_->type(); }
Storage &PublicBase::storage() { return impl_->storage(); }
durability::WriteAheadLog &PublicBase::wal() { return impl_->wal(); }
integrations::kafka::Streams &PublicBase::kafka_streams() {
return impl_->kafka_streams();
}
tx::Engine &PublicBase::tx_engine() { return impl_->tx_engine(); }
ConcurrentIdMapper<Label> &PublicBase::label_mapper() {
return impl_->label_mapper();

View File

@ -8,7 +8,6 @@
#include "database/storage.hpp"
#include "database/storage_gc.hpp"
#include "durability/wal.hpp"
#include "integrations/kafka/streams.hpp"
#include "io/network/endpoint.hpp"
#include "storage/concurrent_id_mapper.hpp"
#include "storage/types.hpp"
@ -93,7 +92,6 @@ class GraphDb {
virtual Type type() const = 0;
virtual Storage &storage() = 0;
virtual durability::WriteAheadLog &wal() = 0;
virtual integrations::kafka::Streams &kafka_streams() = 0;
virtual tx::Engine &tx_engine() = 0;
virtual storage::ConcurrentIdMapper<storage::Label> &label_mapper() = 0;
virtual storage::ConcurrentIdMapper<storage::EdgeType>
@ -151,7 +149,6 @@ class PublicBase : public GraphDb {
Type type() const override;
Storage &storage() override;
durability::WriteAheadLog &wal() override;
integrations::kafka::Streams &kafka_streams() override;
tx::Engine &tx_engine() override;
storage::ConcurrentIdMapper<storage::Label> &label_mapper() override;
storage::ConcurrentIdMapper<storage::EdgeType> &edge_type_mapper() override;

View File

@ -1,7 +1,9 @@
set(integrations_src_files
kafka/streams.cpp
kafka/transform.cpp
kafka/consumer.cpp)
add_library(mg-integrations STATIC ${integrations_src_files})
target_link_libraries(mg-integrations Threads::Threads fmt glog gflags
librdkafka++ librdkafka zlib)
target_link_libraries(mg-integrations stdc++fs Threads::Threads fmt
glog gflags librdkafka++ librdkafka zlib json)
target_link_libraries(mg-integrations mg-utils mg-requests)

View File

@ -1,10 +1,12 @@
#include "integrations/kafka/consumer.hpp"
#include <chrono>
#include <thread>
#include "glog/logging.h"
#include "integrations/kafka/exceptions.hpp"
#include "utils/on_scope_exit.hpp"
namespace integrations {
namespace kafka {
@ -12,7 +14,7 @@ namespace kafka {
using namespace std::chrono_literals;
constexpr int64_t kDefaultBatchIntervalMillis = 100;
constexpr int64_t kDefaultBatchSize = 10;
constexpr int64_t kDefaultBatchSize = 1000;
constexpr int64_t kDefaultTestBatchLimit = 1;
void Consumer::event_cb(RdKafka::Event &event) {
@ -26,7 +28,12 @@ void Consumer::event_cb(RdKafka::Event &event) {
}
}
Consumer::Consumer(const StreamInfo &info) : info_(info) {
Consumer::Consumer(
const StreamInfo &info, const std::string &transform_script_path,
std::function<void(const std::vector<std::string> &)> stream_writer)
: info_(info),
stream_writer_(stream_writer),
transform_(info.transform_uri, transform_script_path) {
std::unique_ptr<RdKafka::Conf> conf(
RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL));
std::string error;
@ -94,17 +101,25 @@ Consumer::Consumer(const StreamInfo &info) : info_(info) {
void Consumer::StopConsuming() {
is_running_.store(false);
if (thread_.joinable()) thread_.join();
// Set limit_batches to nullopt since it's not running anymore.
info_.limit_batches = std::experimental::nullopt;
}
void Consumer::StartConsuming(
std::experimental::optional<int64_t> limit_batches) {
info_.limit_batches = limit_batches;
is_running_.store(true);
thread_ = std::thread([this, limit_batches]() {
int64_t batch_count = 0;
is_running_.store(true);
while (is_running_) {
// TODO (msantl): Figure out what to do with potential exceptions here.
auto batch = this->GetBatch();
// TODO (msantl): transform the batch
auto transformed_batch = transform_.Apply(batch);
stream_writer_(transformed_batch);
if (limit_batches != std::experimental::nullopt) {
if (limit_batches <= ++batch_count) {
is_running_.store(false);
@ -117,7 +132,6 @@ void Consumer::StartConsuming(
std::vector<std::unique_ptr<RdKafka::Message>> Consumer::GetBatch() {
std::vector<std::unique_ptr<RdKafka::Message>> batch;
bool run_batch = false;
auto start = std::chrono::system_clock::now();
int64_t remaining_timeout_in_ms =
info_.batch_interval_in_ms.value_or(kDefaultBatchIntervalMillis);
@ -125,7 +139,8 @@ std::vector<std::unique_ptr<RdKafka::Message>> Consumer::GetBatch() {
batch.reserve(batch_size);
for (int64_t i = 0; i < batch_size; ++i) {
bool run_batch = true;
for (int64_t i = 0; remaining_timeout_in_ms > 0 && i < batch_size; ++i) {
std::unique_ptr<RdKafka::Message> msg(
consumer_->consume(remaining_timeout_in_ms));
switch (msg->err()) {
@ -151,10 +166,6 @@ std::vector<std::unique_ptr<RdKafka::Message>> Consumer::GetBatch() {
auto now = std::chrono::system_clock::now();
auto took =
std::chrono::duration_cast<std::chrono::milliseconds>(now - start);
if (took.count() >= remaining_timeout_in_ms) {
break;
}
remaining_timeout_in_ms = remaining_timeout_in_ms - took.count();
start = now;
}
@ -186,7 +197,7 @@ void Consumer::Stop() {
StopConsuming();
}
void Consumer::StartIfNotStopped() {
void Consumer::StartIfStopped() {
if (!consumer_) {
throw ConsumerNotAvailableException(info_.stream_name);
}
@ -196,7 +207,7 @@ void Consumer::StartIfNotStopped() {
}
}
void Consumer::StopIfNotRunning() {
void Consumer::StopIfRunning() {
if (!consumer_) {
throw ConsumerNotAvailableException(info_.stream_name);
}
@ -221,16 +232,16 @@ std::vector<std::string> Consumer::Test(
is_running_.store(true);
utils::OnScopeExit cleanup([this]() { is_running_.store(false); });
for (int64_t i = 0; i < num_of_batches; ++i) {
auto batch = GetBatch();
// TODO (msantl): transform the batch
auto transformed_batch = transform_.Apply(batch);
for (auto &result : batch) {
results.push_back(
std::string(reinterpret_cast<char *>(result->payload())));
for (auto &record : transformed_batch) {
results.emplace_back(std::move(record));
}
}
is_running_.store(false);
return results;
}

View File

@ -10,6 +10,8 @@
#include "rdkafkacpp.h"
#include "integrations/kafka/transform.hpp"
namespace integrations {
namespace kafka {
@ -21,6 +23,8 @@ struct StreamInfo {
std::experimental::optional<int64_t> batch_interval_in_ms;
std::experimental::optional<int64_t> batch_size;
std::experimental::optional<int64_t> limit_batches;
bool is_running = false;
};
@ -28,7 +32,8 @@ class Consumer final : public RdKafka::EventCb {
public:
Consumer() = delete;
explicit Consumer(const StreamInfo &info);
Consumer(const StreamInfo &info, const std::string &transform_script_path,
std::function<void(const std::vector<std::string> &)> stream_writer);
Consumer(const Consumer &other) = delete;
Consumer(Consumer &&other) = delete;
@ -40,9 +45,9 @@ class Consumer final : public RdKafka::EventCb {
void Stop();
void StartIfNotStopped();
void StartIfStopped();
void StopIfNotRunning();
void StopIfRunning();
std::vector<std::string> Test(
std::experimental::optional<int64_t> limit_batches);
@ -50,9 +55,11 @@ class Consumer final : public RdKafka::EventCb {
StreamInfo info();
private:
void event_cb(RdKafka::Event &event) override;
StreamInfo info_;
std::string transform_script_path_;
std::function<void(const std::vector<std::string> &)> stream_writer_;
Transform transform_;
std::atomic<bool> is_running_{false};
std::thread thread_;
@ -61,6 +68,8 @@ class Consumer final : public RdKafka::EventCb {
std::function<void(RdKafka::KafkaConsumer *)>>
consumer_;
void event_cb(RdKafka::Event &event) override;
void StopConsuming();
void StartConsuming(std::experimental::optional<int64_t> limit_batches);

View File

@ -4,6 +4,8 @@
#include <fmt/format.h>
namespace integrations {
namespace kafka {
class KafkaStreamException : public utils::BasicException {
using utils::BasicException::BasicException;
};
@ -22,6 +24,33 @@ class StreamDoesntExistException : public KafkaStreamException {
fmt::format("Kafka stream {} doesn't exist.", stream_name)) {}
};
class StreamSerializationException : public KafkaStreamException {
public:
StreamSerializationException()
: KafkaStreamException("Failed to serialize stream data!") {}
};
class StreamDeserializationException : public KafkaStreamException {
public:
StreamDeserializationException()
: KafkaStreamException("Failed to deserialize stream data!") {}
};
class StreamMetadataCouldNotBeStored : public KafkaStreamException {
public:
explicit StreamMetadataCouldNotBeStored(const std::string &stream_name)
: KafkaStreamException(fmt::format(
"Couldn't persist stream metadata for stream {}", stream_name)) {}
};
class StreamMetadataCouldNotBeDeleted : public KafkaStreamException {
public:
explicit StreamMetadataCouldNotBeDeleted(const std::string &stream_name)
: KafkaStreamException(fmt::format(
"Couldn't delete persisted stream metadata for stream {}",
stream_name)) {}
};
class ConsumerFailedToInitializeException : public KafkaStreamException {
public:
ConsumerFailedToInitializeException(const std::string &stream_name,
@ -57,3 +86,36 @@ class TopicNotFoundException : public KafkaStreamException {
: KafkaStreamException(
fmt::format("Kafka stream {}, topic not found", stream_name)) {}
};
class TransformScriptNotFoundException : public KafkaStreamException {
public:
explicit TransformScriptNotFoundException(const std::string &stream_name)
: KafkaStreamException(fmt::format(
"Couldn't find transform script for {}", stream_name)) {}
};
class TransformScriptDownloadException : public KafkaStreamException {
public:
explicit TransformScriptDownloadException(const std::string &transform_uri)
: KafkaStreamException(fmt::format(
"Couldn't get the transform script from {}", transform_uri)) {}
};
class TransformScriptCouldNotBeCreatedException : public KafkaStreamException {
public:
explicit TransformScriptCouldNotBeCreatedException(
const std::string &stream_name)
: KafkaStreamException(fmt::format(
"Couldn't create transform script for stream {}", stream_name)) {}
};
class TransformScriptCouldNotBeDeletedException : public KafkaStreamException {
public:
explicit TransformScriptCouldNotBeDeletedException(
const std::string &stream_name)
: KafkaStreamException(fmt::format(
"Couldn't delete transform script for stream {}", stream_name)) {}
};
} // namespace kafka
} // namespace integrations

View File

@ -1,60 +1,238 @@
#include "integrations/kafka/streams.hpp"
#include <cstdio>
#include <experimental/filesystem>
#include <experimental/optional>
#include <json/json.hpp>
#include "integrations/kafka/exceptions.hpp"
#include "utils/file.hpp"
namespace integrations {
namespace kafka {
void Streams::CreateStream(const StreamInfo &info) {
namespace fs = std::experimental::filesystem;
const std::string kMetadataDir = "metadata";
const std::string kTransformDir = "transform";
const std::string kTransformExt = ".py";
namespace {
nlohmann::json Serialize(const StreamInfo &info) {
nlohmann::json data = nlohmann::json::object();
data["stream_name"] = info.stream_name;
data["stream_uri"] = info.stream_uri;
data["stream_topic"] = info.stream_topic;
data["transform_uri"] = info.transform_uri;
if (info.batch_interval_in_ms) {
data["batch_interval_in_ms"] = info.batch_interval_in_ms.value();
} else {
data["batch_interval_in_ms"] = nullptr;
}
if (info.batch_size) {
data["batch_size"] = info.batch_size.value();
} else {
data["batch_size"] = nullptr;
}
if (info.limit_batches) {
data["limit_batches"] = info.limit_batches.value();
} else {
data["limit_batches"] = nullptr;
}
data["is_running"] = info.is_running;
return data;
}
StreamInfo Deserialize(const nlohmann::json &data) {
if (!data.is_object()) throw StreamDeserializationException();
StreamInfo info;
if (!data["stream_name"].is_string()) throw StreamDeserializationException();
info.stream_name = data["stream_name"];
if (!data["stream_uri"].is_string()) throw StreamDeserializationException();
info.stream_uri = data["stream_uri"];
if (!data["stream_topic"].is_string()) throw StreamDeserializationException();
info.stream_topic = data["stream_topic"];
if (!data["transform_uri"].is_string())
throw StreamDeserializationException();
info.transform_uri = data["transform_uri"];
if (data["batch_interval_in_ms"].is_number()) {
info.batch_interval_in_ms = data["batch_interval_in_ms"];
} else if (data["batch_interval_in_ms"].is_null()) {
info.batch_interval_in_ms = std::experimental::nullopt;
} else {
throw StreamDeserializationException();
}
if (data["batch_size"].is_number()) {
info.batch_size = data["batch_size"];
} else if (data["batch_size"].is_null()) {
info.batch_size = std::experimental::nullopt;
} else {
throw StreamDeserializationException();
}
if (!data["is_running"].is_boolean()) throw StreamDeserializationException();
info.is_running = data["is_running"];
if (data["limit_batches"].is_number()) {
info.limit_batches = data["limit_batches"];
} else if (data["limit_batches"].is_null()) {
info.limit_batches = std::experimental::nullopt;
} else {
throw StreamDeserializationException();
}
return info;
}
} // namespace
Streams::Streams(
const std::string &streams_directory,
std::function<void(const std::vector<std::string> &)> stream_writer)
: streams_directory_(streams_directory),
stream_writer_(stream_writer),
metadata_store_(fs::path(streams_directory) / kMetadataDir) {}
void Streams::Recover() {
for (auto it = metadata_store_.begin(); it != metadata_store_.end(); ++it) {
// Check if the transform script also exists;
auto transform_script = GetTransformScriptPath(it->first);
if (!fs::exists(transform_script))
throw TransformScriptNotFoundException(it->first);
nlohmann::json data;
try {
data = nlohmann::json::parse(it->second);
} catch (const nlohmann::json::parse_error &e) {
throw StreamDeserializationException();
}
StreamInfo info = Deserialize(data);
Create(info);
if (info.is_running) Start(info.stream_name, info.limit_batches);
}
}
void Streams::Create(const StreamInfo &info) {
std::lock_guard<std::mutex> g(mutex_);
if (consumers_.find(info.stream_name) != consumers_.end())
throw StreamExistsException(info.stream_name);
consumers_.emplace(info.stream_name, info);
// Store stream_info in metadata_store_.
if (!metadata_store_.Put(info.stream_name, Serialize(info).dump())) {
throw StreamMetadataCouldNotBeStored(info.stream_name);
}
// Make sure transform directory exists or we can create it.
if (!utils::EnsureDir(GetTransformScriptDir())) {
throw TransformScriptCouldNotBeCreatedException(info.stream_name);
}
try {
consumers_.emplace(
std::piecewise_construct, std::forward_as_tuple(info.stream_name),
std::forward_as_tuple(info, GetTransformScriptPath(info.stream_name),
stream_writer_));
} catch (const KafkaStreamException &e) {
// If we failed to create the consumer, remove the persisted metadata.
metadata_store_.Delete(info.stream_name);
// Rethrow the exception.
throw;
}
}
void Streams::DropStream(const std::string &stream_name) {
void Streams::Drop(const std::string &stream_name) {
std::lock_guard<std::mutex> g(mutex_);
auto find_it = consumers_.find(stream_name);
if (find_it == consumers_.end())
throw StreamDoesntExistException(stream_name);
// Erase and implicitly stop the consumer.
consumers_.erase(find_it);
// Remove stream_info in metadata_store_.
if (!metadata_store_.Delete(stream_name)) {
throw StreamMetadataCouldNotBeDeleted(stream_name);
}
// Remove transform script.
if (std::remove(GetTransformScriptPath(stream_name).c_str())) {
throw TransformScriptNotFoundException(stream_name);
}
}
void Streams::StartStream(const std::string &stream_name,
std::experimental::optional<int64_t> batch_limit) {
void Streams::Start(const std::string &stream_name,
std::experimental::optional<int64_t> limit_batches) {
std::lock_guard<std::mutex> g(mutex_);
auto find_it = consumers_.find(stream_name);
if (find_it == consumers_.end())
throw StreamDoesntExistException(stream_name);
find_it->second.Start(batch_limit);
find_it->second.Start(limit_batches);
// Store stream_info in metadata_store_.
if (!metadata_store_.Put(stream_name,
Serialize(find_it->second.info()).dump())) {
throw StreamMetadataCouldNotBeStored(stream_name);
}
}
void Streams::StopStream(const std::string &stream_name) {
void Streams::Stop(const std::string &stream_name) {
std::lock_guard<std::mutex> g(mutex_);
auto find_it = consumers_.find(stream_name);
if (find_it == consumers_.end())
throw StreamDoesntExistException(stream_name);
find_it->second.Stop();
}
void Streams::StartAllStreams() {
std::lock_guard<std::mutex> g(mutex_);
for (auto &consumer_kv : consumers_) {
consumer_kv.second.StartIfNotStopped();
// Store stream_info in metadata_store_.
if (!metadata_store_.Put(stream_name,
Serialize(find_it->second.info()).dump())) {
throw StreamMetadataCouldNotBeStored(stream_name);
}
}
void Streams::StopAllStreams() {
void Streams::StartAll() {
std::lock_guard<std::mutex> g(mutex_);
for (auto &consumer_kv : consumers_) {
consumer_kv.second.StopIfNotRunning();
consumer_kv.second.StartIfStopped();
// Store stream_info in metadata_store_.
if (!metadata_store_.Put(consumer_kv.first,
Serialize(consumer_kv.second.info()).dump())) {
throw StreamMetadataCouldNotBeStored(consumer_kv.first);
}
}
}
std::vector<StreamInfo> Streams::ShowStreams() {
void Streams::StopAll() {
std::lock_guard<std::mutex> g(mutex_);
for (auto &consumer_kv : consumers_) {
consumer_kv.second.StopIfRunning();
// Store stream_info in metadata_store_.
if (!metadata_store_.Put(consumer_kv.first,
Serialize(consumer_kv.second.info()).dump())) {
throw StreamMetadataCouldNotBeStored(consumer_kv.first);
}
}
}
std::vector<StreamInfo> Streams::Show() {
std::vector<StreamInfo> streams;
std::lock_guard<std::mutex> g(mutex_);
for (auto &consumer_kv : consumers_) {
@ -64,15 +242,23 @@ std::vector<StreamInfo> Streams::ShowStreams() {
return streams;
}
std::vector<std::string> Streams::TestStream(
std::vector<std::string> Streams::Test(
const std::string &stream_name,
std::experimental::optional<int64_t> batch_limit) {
std::experimental::optional<int64_t> limit_batches) {
std::lock_guard<std::mutex> g(mutex_);
auto find_it = consumers_.find(stream_name);
if (find_it == consumers_.end())
throw StreamDoesntExistException(stream_name);
return find_it->second.Test(batch_limit);
return find_it->second.Test(limit_batches);
}
std::string Streams::GetTransformScriptDir() {
return fs::path(streams_directory_) / kTransformDir;
}
std::string Streams::GetTransformScriptPath(const std::string &stream_name) {
return fs::path(GetTransformScriptDir()) / (stream_name + kTransformExt);
}
} // namespace kafka

View File

@ -6,37 +6,49 @@
#include <mutex>
#include <unordered_map>
#include "storage/kvstore.hpp"
namespace integrations {
namespace kafka {
class Streams final {
public:
void CreateStream(const StreamInfo &info);
Streams(const std::string &streams_directory,
std::function<void(const std::vector<std::string> &)> stream_writer);
void DropStream(const std::string &stream_name);
void Recover();
void StartStream(const std::string &stream_name,
std::experimental::optional<int64_t> batch_limit =
std::experimental::nullopt);
void Create(const StreamInfo &info);
void StopStream(const std::string &stream_name);
void Drop(const std::string &stream_name);
void StartAllStreams();
void Start(const std::string &stream_name,
std::experimental::optional<int64_t> batch_limit =
std::experimental::nullopt);
void StopAllStreams();
void Stop(const std::string &stream_name);
std::vector<StreamInfo> ShowStreams();
void StartAll();
std::vector<std::string> TestStream(
const std::string &stream_name,
std::experimental::optional<int64_t> batch_limit =
std::experimental::nullopt);
void StopAll();
std::vector<StreamInfo> Show();
std::vector<std::string> Test(const std::string &stream_name,
std::experimental::optional<int64_t>
batch_limit = std::experimental::nullopt);
private:
std::string streams_directory_;
std::function<void(const std::vector<std::string> &)> stream_writer_;
storage::KVStore metadata_store_;
std::mutex mutex_;
std::unordered_map<std::string, Consumer> consumers_;
// TODO (msantl): persist stream storage
std::string GetTransformScriptDir();
std::string GetTransformScriptPath(const std::string &stream_name);
};
} // namespace kafka

View File

@ -0,0 +1,31 @@
#include "integrations/kafka/transform.hpp"
#include "integrations/kafka/exceptions.hpp"
#include "requests/requests.hpp"
namespace integrations {
namespace kafka {
Transform::Transform(const std::string &transform_script_uri,
const std::string &transform_script_path)
: transform_script_path_(transform_script_path) {
if (!requests::CreateAndDownloadFile(transform_script_uri,
transform_script_path)) {
throw TransformScriptDownloadException(transform_script_uri);
}
}
std::vector<std::string> Transform::Apply(
const std::vector<std::unique_ptr<RdKafka::Message>> &batch) {
// TODO (msantl): dummy transform, do the actual transform later @mferencevic
std::vector<std::string> transformed_batch;
transformed_batch.reserve(batch.size());
for (auto &record : batch) {
transformed_batch.push_back(reinterpret_cast<char *>(record->payload()));
}
return transformed_batch;
}
} // namespace kafka
} // namespace integrations

View File

@ -0,0 +1,27 @@
#pragma once
#include <memory>
#include <string>
#include <vector>
#include "rdkafkacpp.h"
namespace integrations {
namespace kafka {
class Transform final {
public:
Transform(const std::string &transform_script_uri,
const std::string &transform_script_path);
std::vector<std::string> Apply(
const std::vector<std::unique_ptr<RdKafka::Message>> &batch);
auto transform_script_path() const { return transform_script_path_; }
private:
std::string transform_script_path_;
};
} // namespace kafka
} // namespace integrations

View File

@ -16,9 +16,12 @@
#include "database/graph_db.hpp"
#include "distributed/pull_rpc_clients.hpp"
#include "glue/conversion.hpp"
#include "integrations/kafka/exceptions.hpp"
#include "integrations/kafka/streams.hpp"
#include "query/exceptions.hpp"
#include "query/interpreter.hpp"
#include "query/transaction_engine.hpp"
#include "requests/requests.hpp"
#include "stats/stats.hpp"
#include "telemetry/telemetry.hpp"
#include "utils/flag_validation.hpp"
@ -140,6 +143,17 @@ class BoltSession final
using ServerT = communication::Server<BoltSession, SessionData>;
using communication::ServerContext;
/**
* Class that implements ResultStream API for Kafka.
*
* Kafka doesn't need to stream the import results back to the client so we
* don't need any functionality here.
*/
class KafkaResultStream {
public:
void Result(const std::vector<query::TypedValue> &) {}
};
// Needed to correctly handle memgraph destruction from a signal handler.
// Without having some sort of a flag, it is possible that a signal is handled
// when we are exiting main, inside destructors of database::GraphDb and
@ -239,6 +253,8 @@ int WithInit(int argc, char **argv,
<< " MB left.";
});
}
requests::Init();
memgraph_main();
return 0;
}
@ -248,6 +264,34 @@ void SingleNodeMain() {
database::SingleNode db;
SessionData session_data{db};
auto stream_writer =
[&session_data](const std::vector<std::string> &queries) {
database::GraphDbAccessor dba(session_data.db);
for (auto &query : queries) {
KafkaResultStream stream;
try {
session_data.interpreter(query, dba, {}, false).PullAll(stream);
} catch (const query::QueryException &e) {
LOG(ERROR) << e.what();
}
}
dba.Commit();
};
integrations::kafka::Streams kafka_streams{
std::experimental::filesystem::path(FLAGS_durability_directory) /
"streams",
stream_writer};
try {
// Recover possible streams.
kafka_streams.Recover();
} catch (const integrations::kafka::KafkaStreamException &e) {
LOG(ERROR) << e.what();
}
session_data.interpreter.kafka_streams_ = &kafka_streams;
ServerContext context;
std::string service_name = "Bolt";
if (FLAGS_key_file != "" && FLAGS_cert_file != "") {
@ -262,7 +306,6 @@ void SingleNodeMain() {
// Setup telemetry
std::experimental::optional<telemetry::Telemetry> telemetry;
if (FLAGS_telemetry_enabled) {
telemetry::Init();
telemetry.emplace(
"https://telemetry.memgraph.com/88b5e7e8-746a-11e8-9f85-538a9e9690cc/",
std::experimental::filesystem::path(FLAGS_durability_directory) /
@ -310,6 +353,34 @@ void MasterMain() {
database::Master db;
SessionData session_data{db};
auto stream_writer =
[&session_data](const std::vector<std::string> &queries) {
database::GraphDbAccessor dba(session_data.db);
for (auto &query : queries) {
KafkaResultStream stream;
try {
session_data.interpreter(query, dba, {}, false).PullAll(stream);
} catch (const query::QueryException &e) {
LOG(ERROR) << e.what();
}
}
dba.Commit();
};
integrations::kafka::Streams kafka_streams{
std::experimental::filesystem::path(FLAGS_durability_directory) /
"streams",
stream_writer};
try {
// Recover possible streams.
kafka_streams.Recover();
} catch (const integrations::kafka::KafkaStreamException &e) {
LOG(ERROR) << e.what();
}
session_data.interpreter.kafka_streams_ = &kafka_streams;
ServerContext context;
std::string service_name = "Bolt";
if (FLAGS_key_file != "" && FLAGS_cert_file != "") {

View File

@ -5,6 +5,12 @@
#include "query/frontend/semantic/symbol_table.hpp"
#include "query/parameters.hpp"
namespace integrations {
namespace kafka {
class Streams;
} // namespace kafka
} // namespace integrations
namespace query {
class Context {
@ -25,6 +31,8 @@ class Context {
bool in_explicit_transaction_ = false;
bool is_index_created_ = false;
int64_t timestamp_{-1};
integrations::kafka::Streams *kafka_streams_ = nullptr;
};
} // namespace query

View File

@ -61,6 +61,7 @@ Interpreter::Results Interpreter::operator()(
ctx.timestamp_ = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
ctx.kafka_streams_ = kafka_streams_;
// query -> stripped query
StrippedQuery stripped(query);

View File

@ -20,6 +20,12 @@ namespace distributed {
class PlanDispatcher;
}
namespace integrations {
namespace kafka {
class Streams;
} // namespace kafka
} // namespace integrations
namespace query {
class Interpreter {
@ -159,6 +165,8 @@ class Interpreter {
const std::map<std::string, TypedValue> &params,
bool in_explicit_transaction);
integrations::kafka::Streams *kafka_streams_ = nullptr;
private:
ConcurrentMap<HashType, AstStorage> ast_cache_;
PlanCacheT plan_cache_;

View File

@ -16,12 +16,14 @@
#include "boost/serialization/export.hpp"
#include "glog/logging.h"
#include "communication/result_stream_faker.hpp"
#include "database/graph_db_accessor.hpp"
#include "distributed/bfs_rpc_clients.hpp"
#include "distributed/pull_rpc_clients.hpp"
#include "distributed/updates_rpc_clients.hpp"
#include "distributed/updates_rpc_server.hpp"
#include "integrations/kafka/exceptions.hpp"
#include "integrations/kafka/streams.hpp"
#include "query/context.hpp"
#include "query/exceptions.hpp"
#include "query/frontend/ast/ast.hpp"
@ -3916,8 +3918,8 @@ class CreateStreamCursor : public Cursor {
using StreamInfo = integrations::kafka::StreamInfo;
public:
CreateStreamCursor(const CreateStream &self, database::GraphDbAccessor &db)
: self_(self), db_(db) {}
CreateStreamCursor(const CreateStream &self, database::GraphDbAccessor &)
: self_(self) {}
bool Pull(Frame &frame, Context &ctx) override {
if (ctx.in_explicit_transaction_) {
@ -3948,8 +3950,8 @@ class CreateStreamCursor : public Cursor {
info.batch_interval_in_ms = batch_interval_in_ms;
info.batch_size = batch_size;
db_.db().kafka_streams().CreateStream(info);
} catch (const KafkaStreamException &e) {
ctx.kafka_streams_->Create(info);
} catch (const integrations::kafka::KafkaStreamException &e) {
throw QueryRuntimeException(e.what());
}
@ -3960,7 +3962,6 @@ class CreateStreamCursor : public Cursor {
private:
const CreateStream &self_;
database::GraphDbAccessor &db_;
};
std::unique_ptr<Cursor> CreateStream::MakeCursor(
@ -3975,8 +3976,8 @@ WITHOUT_SINGLE_INPUT(DropStream)
class DropStreamCursor : public Cursor {
public:
DropStreamCursor(const DropStream &self, database::GraphDbAccessor &db)
: self_(self), db_(db) {}
DropStreamCursor(const DropStream &self, database::GraphDbAccessor &)
: self_(self) {}
bool Pull(Frame &frame, Context &ctx) override {
if (ctx.in_explicit_transaction_) {
@ -3984,8 +3985,8 @@ class DropStreamCursor : public Cursor {
}
try {
db_.db().kafka_streams().DropStream(self_.stream_name());
} catch (const KafkaStreamException &e) {
ctx.kafka_streams_->Drop(self_.stream_name());
} catch (const integrations::kafka::KafkaStreamException &e) {
throw QueryRuntimeException(e.what());
}
return false;
@ -3995,7 +3996,6 @@ class DropStreamCursor : public Cursor {
private:
const DropStream &self_;
database::GraphDbAccessor &db_;
};
std::unique_ptr<Cursor> DropStream::MakeCursor(
@ -4021,8 +4021,8 @@ std::vector<Symbol> ShowStreams::OutputSymbols(const SymbolTable &) const {
class ShowStreamsCursor : public Cursor {
public:
ShowStreamsCursor(const ShowStreams &self, database::GraphDbAccessor &db)
: self_(self), db_(db) {}
ShowStreamsCursor(const ShowStreams &self, database::GraphDbAccessor &)
: self_(self) {}
bool Pull(Frame &frame, Context &ctx) override {
if (ctx.in_explicit_transaction_) {
@ -4030,7 +4030,7 @@ class ShowStreamsCursor : public Cursor {
}
if (!is_initialized_) {
streams_ = db_.db().kafka_streams().ShowStreams();
streams_ = ctx.kafka_streams_->Show();
streams_it_ = streams_.begin();
is_initialized_ = true;
}
@ -4052,7 +4052,6 @@ class ShowStreamsCursor : public Cursor {
private:
const ShowStreams &self_;
database::GraphDbAccessor &db_;
bool is_initialized_ = false;
using StreamInfo = integrations::kafka::StreamInfo;
@ -4076,8 +4075,8 @@ WITHOUT_SINGLE_INPUT(StartStopStream)
class StartStopStreamCursor : public Cursor {
public:
StartStopStreamCursor(const StartStopStream &self,
database::GraphDbAccessor &db)
: self_(self), db_(db) {}
database::GraphDbAccessor &)
: self_(self) {}
bool Pull(Frame &frame, Context &ctx) override {
if (ctx.in_explicit_transaction_) {
@ -4093,12 +4092,11 @@ class StartStopStreamCursor : public Cursor {
try {
if (self_.is_start()) {
db_.db().kafka_streams().StartStream(self_.stream_name(),
limit_batches);
ctx.kafka_streams_->Start(self_.stream_name(), limit_batches);
} else {
db_.db().kafka_streams().StopStream(self_.stream_name());
ctx.kafka_streams_->Stop(self_.stream_name());
}
} catch (const KafkaStreamException &e) {
} catch (const integrations::kafka::KafkaStreamException &e) {
throw QueryRuntimeException(e.what());
}
@ -4109,7 +4107,6 @@ class StartStopStreamCursor : public Cursor {
private:
const StartStopStream &self_;
database::GraphDbAccessor &db_;
};
std::unique_ptr<Cursor> StartStopStream::MakeCursor(
@ -4124,8 +4121,8 @@ WITHOUT_SINGLE_INPUT(StartStopAllStreams)
class StartStopAllStreamsCursor : public Cursor {
public:
StartStopAllStreamsCursor(const StartStopAllStreams &self,
database::GraphDbAccessor &db)
: self_(self), db_(db) {}
database::GraphDbAccessor &)
: self_(self) {}
bool Pull(Frame &frame, Context &ctx) override {
if (ctx.in_explicit_transaction_) {
@ -4134,11 +4131,11 @@ class StartStopAllStreamsCursor : public Cursor {
try {
if (self_.is_start()) {
db_.db().kafka_streams().StartAllStreams();
ctx.kafka_streams_->StartAll();
} else {
db_.db().kafka_streams().StopAllStreams();
ctx.kafka_streams_->StopAll();
}
} catch (const KafkaStreamException &e) {
} catch (const integrations::kafka::KafkaStreamException &e) {
throw QueryRuntimeException(e.what());
}
@ -4151,7 +4148,6 @@ class StartStopAllStreamsCursor : public Cursor {
private:
const StartStopAllStreams &self_;
database::GraphDbAccessor &db_;
};
std::unique_ptr<Cursor> StartStopAllStreams::MakeCursor(
@ -4167,10 +4163,14 @@ TestStream::TestStream(std::string stream_name, Expression *limit_batches,
WITHOUT_SINGLE_INPUT(TestStream)
std::vector<Symbol> TestStream::OutputSymbols(const SymbolTable &) const {
return {test_result_symbol_};
}
class TestStreamCursor : public Cursor {
public:
TestStreamCursor(const TestStream &self, database::GraphDbAccessor &db)
: self_(self), db_(db) {}
TestStreamCursor(const TestStream &self, database::GraphDbAccessor &)
: self_(self) {}
bool Pull(Frame &frame, Context &ctx) override {
if (ctx.in_explicit_transaction_) {
@ -4187,9 +4187,8 @@ class TestStreamCursor : public Cursor {
}
try {
results_ = db_.db().kafka_streams().TestStream(self_.stream_name(),
limit_batches);
} catch (const KafkaStreamException &e) {
results_ = ctx.kafka_streams_->Test(self_.stream_name(), limit_batches);
} catch (const integrations::kafka::KafkaStreamException &e) {
throw QueryRuntimeException(e.what());
}
results_it_ = results_.begin();
@ -4208,7 +4207,6 @@ class TestStreamCursor : public Cursor {
private:
const TestStream &self_;
database::GraphDbAccessor &db_;
bool is_initialized_ = false;
std::vector<std::string> results_;

View File

@ -2703,6 +2703,7 @@ in the db.")
DEFVISITABLE(HierarchicalLogicalOperatorVisitor);
std::unique_ptr<Cursor> MakeCursor(
database::GraphDbAccessor &db) const override;
std::vector<Symbol> OutputSymbols(const SymbolTable &) const override;
std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override {
return {};
}

View File

@ -212,7 +212,7 @@ class RuleBasedPlanner {
symbol_table.CreateSymbol("uri", false),
symbol_table.CreateSymbol("topic", false),
symbol_table.CreateSymbol("transform", false),
symbol_table.CreateSymbol("is running", false));
symbol_table.CreateSymbol("is_running", false));
} else if (auto *start_stop_stream =
dynamic_cast<query::StartStopStream *>(clause)) {
DCHECK(!input_op) << "Unexpected operator before StartStopStream";
@ -230,7 +230,7 @@ class RuleBasedPlanner {
auto &symbol_table = context.symbol_table;
input_op = std::make_unique<plan::TestStream>(
test_stream->stream_name_, test_stream->limit_batches_,
symbol_table.CreateSymbol("test result", false));
symbol_table.CreateSymbol("test_result", false));
} else {
throw utils::NotYetImplemented("clause conversion to operator(s)");
}

View File

@ -0,0 +1,8 @@
set(requests_src_files
requests.cpp)
find_package(CURL REQUIRED)
add_library(mg-requests STATIC ${requests_src_files})
target_link_libraries(mg-requests fmt glog gflags json ${CURL_LIBRARIES})
target_include_directories(mg-requests PRIVATE ${CURL_INCLUDE_DIRS})

View File

@ -1,21 +1,26 @@
#include "telemetry/requests.hpp"
#include "requests/requests.hpp"
#include <cstdio>
#include <curl/curl.h>
#include <fmt/format.h>
#include <gflags/gflags.h>
#include <glog/logging.h>
namespace telemetry {
namespace requests {
void RequestsInit() { curl_global_init(CURL_GLOBAL_ALL); }
namespace {
size_t CurlWriteCallback(char *ptr, size_t size, size_t nmemb, void *userdata) {
return nmemb;
}
} // namespace
void Init() { curl_global_init(CURL_GLOBAL_ALL); }
bool RequestPostJson(const std::string &url, const nlohmann::json &data,
const int timeout) {
int timeout_in_seconds) {
CURL *curl = nullptr;
CURLcode res = CURLE_UNSUPPORTED_PROTOCOL;
@ -40,7 +45,7 @@ bool RequestPostJson(const std::string &url, const nlohmann::json &data,
curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1);
curl_easy_setopt(curl, CURLOPT_MAXREDIRS, 10);
curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
curl_easy_setopt(curl, CURLOPT_TIMEOUT, timeout);
curl_easy_setopt(curl, CURLOPT_TIMEOUT, timeout_in_seconds);
res = curl_easy_perform(curl);
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &response_code);
@ -60,4 +65,46 @@ bool RequestPostJson(const std::string &url, const nlohmann::json &data,
return true;
}
} // namespace telemetry
bool CreateAndDownloadFile(const std::string &url, const std::string &path,
int timeout_in_seconds) {
CURL *curl = nullptr;
CURLcode res = CURLE_UNSUPPORTED_PROTOCOL;
long response_code = 0;
std::string user_agent = fmt::format("memgraph/{}", gflags::VersionString());
curl = curl_easy_init();
if (!curl) return false;
FILE *file = std::fopen(path.c_str(), "w");
if (!file) return false;
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "GET");
curl_easy_setopt(curl, CURLOPT_USERAGENT, user_agent.c_str());
curl_easy_setopt(curl, CURLOPT_WRITEDATA, file);
curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1);
curl_easy_setopt(curl, CURLOPT_MAXREDIRS, 10);
curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
curl_easy_setopt(curl, CURLOPT_TIMEOUT, timeout_in_seconds);
res = curl_easy_perform(curl);
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &response_code);
curl_easy_cleanup(curl);
std::fclose(file);
if (res != CURLE_OK) {
DLOG(WARNING) << "Couldn't perform request: " << curl_easy_strerror(res);
return false;
}
if (response_code != 200) {
DLOG(WARNING) << "Request response code isn't 200 (received "
<< response_code << ")!";
return false;
}
return true;
}
} // namespace requests

41
src/requests/requests.hpp Normal file
View File

@ -0,0 +1,41 @@
#pragma once
#include <string>
#include <json/json.hpp>
namespace requests {
/**
* Call this function in each `main` file that uses the Requests stack. It is
* used to initialize all libraries (primarily cURL).
*
* NOTE: This function must be called **exactly** once.
*/
void Init();
/**
*
* This function sends a POST request with a JSON payload to the `url`.
*
* @param url url to which to send the request
* @param data json payload
* @param timeout the timeout that should be used when making the request
* @return bool true if the request was successful, false otherwise.
*/
bool RequestPostJson(const std::string &url, const nlohmann::json &data,
int timeout_in_seconds = 10);
/**
* This functions sends a GET request to the given `url` and writes the response
* to the given `path`.
*
* @param url url to which to send the request
* @param path path to the file where the response in writeen
* @param timeout the timeout that should be used when making the request
* @return bool true if the request was successful, false otherwise.
*/
bool CreateAndDownloadFile(const std::string &url, const std::string &path,
int timeout_in_seconds = 10);
} // namespace requests

View File

@ -1,11 +1,7 @@
set(telemetry_src_files
collectors.cpp
requests.cpp
telemetry.cpp
system_info.cpp)
find_package(CURL REQUIRED)
add_library(telemetry_lib STATIC ${telemetry_src_files})
target_link_libraries(telemetry_lib glog json ${CURL_LIBRARIES} kvstore_lib)
target_include_directories(telemetry_lib PRIVATE ${CURL_INCLUDE_DIRS})
target_link_libraries(telemetry_lib glog mg-requests kvstore_lib)

View File

@ -1,26 +0,0 @@
#pragma once
#include <string>
#include <json/json.hpp>
namespace telemetry {
/**
* This function is called by the main telemetry `Init` function to initialize
* the requests library.
*/
void RequestsInit();
/**
* This function sends a POST request with a JSON payload to the `url`.
*
* @param url url to which to send the request
* @param data json payload
* @param timeout the timeout that should be used when making the request
* @return bool true if the request was successful, false otherwise.
*/
bool RequestPostJson(const std::string &url, const nlohmann::json &data,
const int timeout = 10);
} // namespace telemetry

View File

@ -5,8 +5,8 @@
#include <fmt/format.h>
#include <glog/logging.h>
#include "requests/requests.hpp"
#include "telemetry/collectors.hpp"
#include "telemetry/requests.hpp"
#include "telemetry/system_info.hpp"
#include "utils/timestamp.hpp"
#include "utils/uuid.hpp"
@ -15,8 +15,6 @@ namespace telemetry {
const int kMaxBatchSize = 100;
void Init() { RequestsInit(); }
Telemetry::Telemetry(
const std::string &url,
const std::experimental::filesystem::path &storage_directory,
@ -69,7 +67,7 @@ void Telemetry::SendData() {
}
}
if (RequestPostJson(url_, payload)) {
if (requests::RequestPostJson(url_, payload)) {
for (const auto &key : keys) {
if (!storage_.Delete(key)) {
DLOG(WARNING) << "Couldn't delete key " << key

View File

@ -11,14 +11,6 @@
namespace telemetry {
/**
* Call this function in each `main` file that uses the Telemetry stack. It is
* used to initialize all libraries (primarily cURL).
*
* NOTE: This function must be called **exactly** once.
*/
void Init();
/**
* This class implements the telemetry collector service. It periodically scapes
* all registered collectors and stores their data. With periodically scraping

View File

@ -3,4 +3,4 @@ set(client_target_name ${target_name}__client)
add_executable(${client_target_name} client.cpp)
set_target_properties(${client_target_name} PROPERTIES OUTPUT_NAME client)
target_link_libraries(${client_target_name} telemetry_lib)
target_link_libraries(${client_target_name} mg-requests telemetry_lib)

View File

@ -1,5 +1,6 @@
#include <gflags/gflags.h>
#include "requests/requests.hpp"
#include "telemetry/telemetry.hpp"
DEFINE_string(endpoint, "http://127.0.0.1:9000/",
@ -14,7 +15,7 @@ int main(int argc, char **argv) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
google::InitGoogleLogging(argv[0]);
telemetry::Init();
requests::Init();
telemetry::Telemetry telemetry(FLAGS_endpoint, FLAGS_storage_directory,
std::chrono::seconds(FLAGS_interval), 1);