Change Kafka transform script download logic

Reviewers: msantl

Reviewed By: msantl

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1505
This commit is contained in:
Matej Ferencevic 2018-07-24 11:20:47 +02:00
parent 0681040395
commit 5c13c2d22c
5 changed files with 17 additions and 19 deletions

View File

@ -33,7 +33,7 @@ Consumer::Consumer(
std::function<void(const std::vector<std::string> &)> stream_writer)
: info_(info),
stream_writer_(stream_writer),
transform_(info.transform_uri, transform_script_path) {
transform_(transform_script_path) {
std::unique_ptr<RdKafka::Conf> conf(
RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL));
std::string error;

View File

@ -7,6 +7,7 @@
#include <json/json.hpp>
#include "integrations/kafka/exceptions.hpp"
#include "requests/requests.hpp"
#include "utils/file.hpp"
namespace integrations {
@ -122,12 +123,12 @@ void Streams::Recover() {
}
StreamInfo info = Deserialize(data);
Create(info);
Create(info, false);
if (info.is_running) Start(info.stream_name, info.limit_batches);
}
}
void Streams::Create(const StreamInfo &info) {
void Streams::Create(const StreamInfo &info, bool download_transform_script) {
std::lock_guard<std::mutex> g(mutex_);
if (consumers_.find(info.stream_name) != consumers_.end())
throw StreamExistsException(info.stream_name);
@ -142,11 +143,18 @@ void Streams::Create(const StreamInfo &info) {
throw TransformScriptCouldNotBeCreatedException(info.stream_name);
}
// Download the transform script.
auto transform_script_path = GetTransformScriptPath(info.stream_name);
if (download_transform_script &&
!requests::CreateAndDownloadFile(info.transform_uri,
transform_script_path)) {
throw TransformScriptDownloadException(info.transform_uri);
}
try {
consumers_.emplace(
std::piecewise_construct, std::forward_as_tuple(info.stream_name),
std::forward_as_tuple(info, GetTransformScriptPath(info.stream_name),
stream_writer_));
std::forward_as_tuple(info, transform_script_path, stream_writer_));
} catch (const KafkaStreamException &e) {
// If we failed to create the consumer, remove the persisted metadata.
metadata_store_.Delete(info.stream_name);

View File

@ -18,7 +18,7 @@ class Streams final {
void Recover();
void Create(const StreamInfo &info);
void Create(const StreamInfo &info, bool download_transform_script = true);
void Drop(const std::string &stream_name);

View File

@ -1,19 +1,10 @@
#include "integrations/kafka/transform.hpp"
#include "integrations/kafka/exceptions.hpp"
#include "requests/requests.hpp"
namespace integrations {
namespace kafka {
Transform::Transform(const std::string &transform_script_uri,
const std::string &transform_script_path)
: transform_script_path_(transform_script_path) {
if (!requests::CreateAndDownloadFile(transform_script_uri,
transform_script_path)) {
throw TransformScriptDownloadException(transform_script_uri);
}
}
Transform::Transform(const std::string &transform_script_path)
: transform_script_path_(transform_script_path) {}
std::vector<std::string> Transform::Apply(
const std::vector<std::unique_ptr<RdKafka::Message>> &batch) {

View File

@ -11,8 +11,7 @@ namespace kafka {
class Transform final {
public:
Transform(const std::string &transform_script_uri,
const std::string &transform_script_path);
Transform(const std::string &transform_script_path);
std::vector<std::string> Apply(
const std::vector<std::unique_ptr<RdKafka::Message>> &batch);