From 5c13c2d22cb66035c6564300f944084afe6dd667 Mon Sep 17 00:00:00 2001
From: Matej Ferencevic <matej.ferencevic@memgraph.io>
Date: Tue, 24 Jul 2018 11:20:47 +0200
Subject: [PATCH] Change Kafka transform script download logic

Reviewers: msantl

Reviewed By: msantl

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1505
---
 src/integrations/kafka/consumer.cpp  |  2 +-
 src/integrations/kafka/streams.cpp   | 16 ++++++++++++----
 src/integrations/kafka/streams.hpp   |  2 +-
 src/integrations/kafka/transform.cpp | 13 ++-----------
 src/integrations/kafka/transform.hpp |  3 +--
 5 files changed, 17 insertions(+), 19 deletions(-)

diff --git a/src/integrations/kafka/consumer.cpp b/src/integrations/kafka/consumer.cpp
index b2136910f..6947a1765 100644
--- a/src/integrations/kafka/consumer.cpp
+++ b/src/integrations/kafka/consumer.cpp
@@ -33,7 +33,7 @@ Consumer::Consumer(
     std::function<void(const std::vector<std::string> &)> stream_writer)
     : info_(info),
       stream_writer_(stream_writer),
-      transform_(info.transform_uri, transform_script_path) {
+      transform_(transform_script_path) {
   std::unique_ptr<RdKafka::Conf> conf(
       RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL));
   std::string error;
diff --git a/src/integrations/kafka/streams.cpp b/src/integrations/kafka/streams.cpp
index 49be02223..cc0165c06 100644
--- a/src/integrations/kafka/streams.cpp
+++ b/src/integrations/kafka/streams.cpp
@@ -7,6 +7,7 @@
 #include <json/json.hpp>
 
 #include "integrations/kafka/exceptions.hpp"
+#include "requests/requests.hpp"
 #include "utils/file.hpp"
 
 namespace integrations {
@@ -122,12 +123,12 @@ void Streams::Recover() {
     }
 
     StreamInfo info = Deserialize(data);
-    Create(info);
+    Create(info, false);
     if (info.is_running) Start(info.stream_name, info.limit_batches);
   }
 }
 
-void Streams::Create(const StreamInfo &info) {
+void Streams::Create(const StreamInfo &info, bool download_transform_script) {
   std::lock_guard<std::mutex> g(mutex_);
   if (consumers_.find(info.stream_name) != consumers_.end())
     throw StreamExistsException(info.stream_name);
@@ -142,11 +143,18 @@ void Streams::Create(const StreamInfo &info) {
     throw TransformScriptCouldNotBeCreatedException(info.stream_name);
   }
 
+  // Download the transform script.
+  auto transform_script_path = GetTransformScriptPath(info.stream_name);
+  if (download_transform_script &&
+      !requests::CreateAndDownloadFile(info.transform_uri,
+                                       transform_script_path)) {
+    throw TransformScriptDownloadException(info.transform_uri);
+  }
+
   try {
     consumers_.emplace(
         std::piecewise_construct, std::forward_as_tuple(info.stream_name),
-        std::forward_as_tuple(info, GetTransformScriptPath(info.stream_name),
-                              stream_writer_));
+        std::forward_as_tuple(info, transform_script_path, stream_writer_));
   } catch (const KafkaStreamException &e) {
     // If we failed to create the consumer, remove the persisted metadata.
     metadata_store_.Delete(info.stream_name);
diff --git a/src/integrations/kafka/streams.hpp b/src/integrations/kafka/streams.hpp
index d9d3de77f..83e846203 100644
--- a/src/integrations/kafka/streams.hpp
+++ b/src/integrations/kafka/streams.hpp
@@ -18,7 +18,7 @@ class Streams final {
 
   void Recover();
 
-  void Create(const StreamInfo &info);
+  void Create(const StreamInfo &info, bool download_transform_script = true);
 
   void Drop(const std::string &stream_name);
 
diff --git a/src/integrations/kafka/transform.cpp b/src/integrations/kafka/transform.cpp
index c157bd2cd..a7d86fec8 100644
--- a/src/integrations/kafka/transform.cpp
+++ b/src/integrations/kafka/transform.cpp
@@ -1,19 +1,10 @@
 #include "integrations/kafka/transform.hpp"
 
-#include "integrations/kafka/exceptions.hpp"
-#include "requests/requests.hpp"
-
 namespace integrations {
 namespace kafka {
 
-Transform::Transform(const std::string &transform_script_uri,
-                     const std::string &transform_script_path)
-    : transform_script_path_(transform_script_path) {
-  if (!requests::CreateAndDownloadFile(transform_script_uri,
-                                       transform_script_path)) {
-    throw TransformScriptDownloadException(transform_script_uri);
-  }
-}
+Transform::Transform(const std::string &transform_script_path)
+    : transform_script_path_(transform_script_path) {}
 
 std::vector<std::string> Transform::Apply(
     const std::vector<std::unique_ptr<RdKafka::Message>> &batch) {
diff --git a/src/integrations/kafka/transform.hpp b/src/integrations/kafka/transform.hpp
index 1e4804d1f..6a632ef72 100644
--- a/src/integrations/kafka/transform.hpp
+++ b/src/integrations/kafka/transform.hpp
@@ -11,8 +11,7 @@ namespace kafka {
 
 class Transform final {
  public:
-  Transform(const std::string &transform_script_uri,
-            const std::string &transform_script_path);
+  Transform(const std::string &transform_script_path);
 
   std::vector<std::string> Apply(
       const std::vector<std::unique_ptr<RdKafka::Message>> &batch);