From 4c27596fddf3cf0c2d5dbe5cbcab3449dbda1ed0 Mon Sep 17 00:00:00 2001
From: Matija Santl <matija.santl@memgraph.com>
Date: Fri, 6 Jul 2018 09:28:05 +0200
Subject: [PATCH] Implement kafka transform functionality

Summary:
First iteration in implementing kafka.
Currently, memgraph streams won't use the transform script provided in the
`CREATE STREAM` query.

There is a manual test that serves a POC purpose which we'll use to fully wire
kafka in memgraph.

Since streams need to download the script, I moved curl init from
telemetry.

Reviewers: teon.banek, mferencevic

Reviewed By: mferencevic

Subscribers: ipaljak, pullbot, buda

Differential Revision: https://phabricator.memgraph.io/D1491
---
 init                                       |   3 +-
 src/CMakeLists.txt                         |   3 +-
 src/database/graph_db.cpp                  |   7 -
 src/database/graph_db.hpp                  |   3 -
 src/integrations/CMakeLists.txt            |   6 +-
 src/integrations/kafka/consumer.cpp        |  45 +++--
 src/integrations/kafka/consumer.hpp        |  19 +-
 src/integrations/kafka/exceptions.hpp      |  62 ++++++
 src/integrations/kafka/streams.cpp         | 222 +++++++++++++++++++--
 src/integrations/kafka/streams.hpp         |  40 ++--
 src/integrations/kafka/transform.cpp       |  31 +++
 src/integrations/kafka/transform.hpp       |  27 +++
 src/memgraph_bolt.cpp                      |  73 ++++++-
 src/query/context.hpp                      |   8 +
 src/query/interpreter.cpp                  |   1 +
 src/query/interpreter.hpp                  |   8 +
 src/query/plan/operator.cpp                |  64 +++---
 src/query/plan/operator.lcp                |   1 +
 src/query/plan/rule_based_planner.hpp      |   4 +-
 src/requests/CMakeLists.txt                |   8 +
 src/{telemetry => requests}/requests.cpp   |  61 +++++-
 src/requests/requests.hpp                  |  41 ++++
 src/telemetry/CMakeLists.txt               |   6 +-
 src/telemetry/requests.hpp                 |  26 ---
 src/telemetry/telemetry.cpp                |   6 +-
 src/telemetry/telemetry.hpp                |   8 -
 tests/integration/telemetry/CMakeLists.txt |   2 +-
 tests/integration/telemetry/client.cpp     |   3 +-
 28 files changed, 632 insertions(+), 156 deletions(-)
 create mode 100644 src/integrations/kafka/transform.cpp
 create mode 100644 src/integrations/kafka/transform.hpp
 create mode 100644 src/requests/CMakeLists.txt
 rename src/{telemetry => requests}/requests.cpp (51%)
 create mode 100644 src/requests/requests.hpp
 delete mode 100644 src/telemetry/requests.hpp

diff --git a/init b/init
index 931b9d0e2..7f441c5e1 100755
--- a/init
+++ b/init
@@ -12,7 +12,8 @@ required_pkgs=(git arcanist # source code control
                libboost-iostreams-dev
                libboost-serialization-dev
                python3 python-virtualenv python3-pip # for qa, macro_benchmark and stress tests
-               uuid-dev libcurl4-openssl-dev # for telemetry
+               uuid-dev # mg-utils
+               libcurl4-openssl-dev # mg-requests
                sbcl # for custom Lisp C++ preprocessing
               )
 
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 0a495230b..e75c8397b 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -2,6 +2,7 @@
 
 # add memgraph sub libraries, ordered by dependency
 add_subdirectory(utils)
+add_subdirectory(requests)
 add_subdirectory(integrations)
 add_subdirectory(io)
 add_subdirectory(telemetry)
@@ -189,7 +190,7 @@ set(MEMGRAPH_ALL_LIBS stdc++fs Threads::Threads fmt cppitertools
     antlr_opencypher_parser_lib dl glog gflags capnp kj
     ${Boost_IOSTREAMS_LIBRARY_RELEASE}
     ${Boost_SERIALIZATION_LIBRARY_RELEASE}
-    mg-utils mg-io mg-integrations mg-communication)
+    mg-utils mg-io mg-integrations mg-requests mg-communication)
 
 if (USE_LTALLOC)
     list(APPEND MEMGRAPH_ALL_LIBS ltalloc)
diff --git a/src/database/graph_db.cpp b/src/database/graph_db.cpp
index ddf229b18..bb5dce37f 100644
--- a/src/database/graph_db.cpp
+++ b/src/database/graph_db.cpp
@@ -56,9 +56,6 @@ class PrivateBase : public GraphDb {
 
   Storage &storage() override { return *storage_; }
   durability::WriteAheadLog &wal() override { return wal_; }
-  integrations::kafka::Streams &kafka_streams() override {
-    return kafka_streams_;
-  }
   int WorkerId() const override { return config_.worker_id; }
 
   // Makes a local snapshot from the visibility of accessor
@@ -101,7 +98,6 @@ class PrivateBase : public GraphDb {
   durability::WriteAheadLog wal_{config_.worker_id,
                                  config_.durability_directory,
                                  config_.durability_enabled};
-  integrations::kafka::Streams kafka_streams_;
 };
 
 template <template <typename TId> class TMapper>
@@ -449,9 +445,6 @@ PublicBase::~PublicBase() {
 GraphDb::Type PublicBase::type() const { return impl_->type(); }
 Storage &PublicBase::storage() { return impl_->storage(); }
 durability::WriteAheadLog &PublicBase::wal() { return impl_->wal(); }
-integrations::kafka::Streams &PublicBase::kafka_streams() {
-  return impl_->kafka_streams();
-}
 tx::Engine &PublicBase::tx_engine() { return impl_->tx_engine(); }
 ConcurrentIdMapper<Label> &PublicBase::label_mapper() {
   return impl_->label_mapper();
diff --git a/src/database/graph_db.hpp b/src/database/graph_db.hpp
index 85a22519e..a5de5bab0 100644
--- a/src/database/graph_db.hpp
+++ b/src/database/graph_db.hpp
@@ -8,7 +8,6 @@
 #include "database/storage.hpp"
 #include "database/storage_gc.hpp"
 #include "durability/wal.hpp"
-#include "integrations/kafka/streams.hpp"
 #include "io/network/endpoint.hpp"
 #include "storage/concurrent_id_mapper.hpp"
 #include "storage/types.hpp"
@@ -93,7 +92,6 @@ class GraphDb {
   virtual Type type() const = 0;
   virtual Storage &storage() = 0;
   virtual durability::WriteAheadLog &wal() = 0;
-  virtual integrations::kafka::Streams &kafka_streams() = 0;
   virtual tx::Engine &tx_engine() = 0;
   virtual storage::ConcurrentIdMapper<storage::Label> &label_mapper() = 0;
   virtual storage::ConcurrentIdMapper<storage::EdgeType>
@@ -151,7 +149,6 @@ class PublicBase : public GraphDb {
   Type type() const override;
   Storage &storage() override;
   durability::WriteAheadLog &wal() override;
-  integrations::kafka::Streams &kafka_streams() override;
   tx::Engine &tx_engine() override;
   storage::ConcurrentIdMapper<storage::Label> &label_mapper() override;
   storage::ConcurrentIdMapper<storage::EdgeType> &edge_type_mapper() override;
diff --git a/src/integrations/CMakeLists.txt b/src/integrations/CMakeLists.txt
index da83b0e42..9a78112f1 100644
--- a/src/integrations/CMakeLists.txt
+++ b/src/integrations/CMakeLists.txt
@@ -1,7 +1,9 @@
 set(integrations_src_files
     kafka/streams.cpp
+    kafka/transform.cpp
     kafka/consumer.cpp)
 
 add_library(mg-integrations STATIC ${integrations_src_files})
-target_link_libraries(mg-integrations Threads::Threads fmt glog gflags
-                      librdkafka++ librdkafka zlib)
+target_link_libraries(mg-integrations stdc++fs Threads::Threads fmt
+                      glog gflags librdkafka++ librdkafka zlib json)
+target_link_libraries(mg-integrations mg-utils mg-requests)
diff --git a/src/integrations/kafka/consumer.cpp b/src/integrations/kafka/consumer.cpp
index e9c178ab8..b2136910f 100644
--- a/src/integrations/kafka/consumer.cpp
+++ b/src/integrations/kafka/consumer.cpp
@@ -1,10 +1,12 @@
 #include "integrations/kafka/consumer.hpp"
 
 #include <chrono>
+#include <thread>
 
 #include "glog/logging.h"
 
 #include "integrations/kafka/exceptions.hpp"
+#include "utils/on_scope_exit.hpp"
 
 namespace integrations {
 namespace kafka {
@@ -12,7 +14,7 @@ namespace kafka {
 using namespace std::chrono_literals;
 
 constexpr int64_t kDefaultBatchIntervalMillis = 100;
-constexpr int64_t kDefaultBatchSize = 10;
+constexpr int64_t kDefaultBatchSize = 1000;
 constexpr int64_t kDefaultTestBatchLimit = 1;
 
 void Consumer::event_cb(RdKafka::Event &event) {
@@ -26,7 +28,12 @@ void Consumer::event_cb(RdKafka::Event &event) {
   }
 }
 
-Consumer::Consumer(const StreamInfo &info) : info_(info) {
+Consumer::Consumer(
+    const StreamInfo &info, const std::string &transform_script_path,
+    std::function<void(const std::vector<std::string> &)> stream_writer)
+    : info_(info),
+      stream_writer_(stream_writer),
+      transform_(info.transform_uri, transform_script_path) {
   std::unique_ptr<RdKafka::Conf> conf(
       RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL));
   std::string error;
@@ -94,17 +101,25 @@ Consumer::Consumer(const StreamInfo &info) : info_(info) {
 void Consumer::StopConsuming() {
   is_running_.store(false);
   if (thread_.joinable()) thread_.join();
+
+  // Set limit_batches to nullopt since it's not running anymore.
+  info_.limit_batches = std::experimental::nullopt;
 }
 
 void Consumer::StartConsuming(
     std::experimental::optional<int64_t> limit_batches) {
+  info_.limit_batches = limit_batches;
+  is_running_.store(true);
+
   thread_ = std::thread([this, limit_batches]() {
     int64_t batch_count = 0;
-    is_running_.store(true);
 
     while (is_running_) {
+      // TODO (msantl): Figure out what to do with potential exceptions here.
       auto batch = this->GetBatch();
-      // TODO (msantl): transform the batch
+      auto transformed_batch = transform_.Apply(batch);
+      stream_writer_(transformed_batch);
+
       if (limit_batches != std::experimental::nullopt) {
         if (limit_batches <= ++batch_count) {
           is_running_.store(false);
@@ -117,7 +132,6 @@ void Consumer::StartConsuming(
 
 std::vector<std::unique_ptr<RdKafka::Message>> Consumer::GetBatch() {
   std::vector<std::unique_ptr<RdKafka::Message>> batch;
-  bool run_batch = false;
   auto start = std::chrono::system_clock::now();
   int64_t remaining_timeout_in_ms =
       info_.batch_interval_in_ms.value_or(kDefaultBatchIntervalMillis);
@@ -125,7 +139,8 @@ std::vector<std::unique_ptr<RdKafka::Message>> Consumer::GetBatch() {
 
   batch.reserve(batch_size);
 
-  for (int64_t i = 0; i < batch_size; ++i) {
+  bool run_batch = true;
+  for (int64_t i = 0; remaining_timeout_in_ms > 0 && i < batch_size; ++i) {
     std::unique_ptr<RdKafka::Message> msg(
         consumer_->consume(remaining_timeout_in_ms));
     switch (msg->err()) {
@@ -151,10 +166,6 @@ std::vector<std::unique_ptr<RdKafka::Message>> Consumer::GetBatch() {
     auto now = std::chrono::system_clock::now();
     auto took =
         std::chrono::duration_cast<std::chrono::milliseconds>(now - start);
-    if (took.count() >= remaining_timeout_in_ms) {
-      break;
-    }
-
     remaining_timeout_in_ms = remaining_timeout_in_ms - took.count();
     start = now;
   }
@@ -186,7 +197,7 @@ void Consumer::Stop() {
   StopConsuming();
 }
 
-void Consumer::StartIfNotStopped() {
+void Consumer::StartIfStopped() {
   if (!consumer_) {
     throw ConsumerNotAvailableException(info_.stream_name);
   }
@@ -196,7 +207,7 @@ void Consumer::StartIfNotStopped() {
   }
 }
 
-void Consumer::StopIfNotRunning() {
+void Consumer::StopIfRunning() {
   if (!consumer_) {
     throw ConsumerNotAvailableException(info_.stream_name);
   }
@@ -221,16 +232,16 @@ std::vector<std::string> Consumer::Test(
 
   is_running_.store(true);
 
+  utils::OnScopeExit cleanup([this]() { is_running_.store(false); });
+
   for (int64_t i = 0; i < num_of_batches; ++i) {
     auto batch = GetBatch();
-    // TODO (msantl): transform the batch
+    auto transformed_batch = transform_.Apply(batch);
 
-    for (auto &result : batch) {
-      results.push_back(
-          std::string(reinterpret_cast<char *>(result->payload())));
+    for (auto &record : transformed_batch) {
+      results.emplace_back(std::move(record));
     }
   }
-  is_running_.store(false);
 
   return results;
 }
diff --git a/src/integrations/kafka/consumer.hpp b/src/integrations/kafka/consumer.hpp
index 0100cd85e..a128fd3e8 100644
--- a/src/integrations/kafka/consumer.hpp
+++ b/src/integrations/kafka/consumer.hpp
@@ -10,6 +10,8 @@
 
 #include "rdkafkacpp.h"
 
+#include "integrations/kafka/transform.hpp"
+
 namespace integrations {
 namespace kafka {
 
@@ -21,6 +23,8 @@ struct StreamInfo {
   std::experimental::optional<int64_t> batch_interval_in_ms;
   std::experimental::optional<int64_t> batch_size;
 
+  std::experimental::optional<int64_t> limit_batches;
+
   bool is_running = false;
 };
 
@@ -28,7 +32,8 @@ class Consumer final : public RdKafka::EventCb {
  public:
   Consumer() = delete;
 
-  explicit Consumer(const StreamInfo &info);
+  Consumer(const StreamInfo &info, const std::string &transform_script_path,
+           std::function<void(const std::vector<std::string> &)> stream_writer);
 
   Consumer(const Consumer &other) = delete;
   Consumer(Consumer &&other) = delete;
@@ -40,9 +45,9 @@ class Consumer final : public RdKafka::EventCb {
 
   void Stop();
 
-  void StartIfNotStopped();
+  void StartIfStopped();
 
-  void StopIfNotRunning();
+  void StopIfRunning();
 
   std::vector<std::string> Test(
       std::experimental::optional<int64_t> limit_batches);
@@ -50,9 +55,11 @@ class Consumer final : public RdKafka::EventCb {
   StreamInfo info();
 
  private:
-  void event_cb(RdKafka::Event &event) override;
-
   StreamInfo info_;
+  std::string transform_script_path_;
+  std::function<void(const std::vector<std::string> &)> stream_writer_;
+
+  Transform transform_;
 
   std::atomic<bool> is_running_{false};
   std::thread thread_;
@@ -61,6 +68,8 @@ class Consumer final : public RdKafka::EventCb {
                   std::function<void(RdKafka::KafkaConsumer *)>>
       consumer_;
 
+  void event_cb(RdKafka::Event &event) override;
+
   void StopConsuming();
 
   void StartConsuming(std::experimental::optional<int64_t> limit_batches);
diff --git a/src/integrations/kafka/exceptions.hpp b/src/integrations/kafka/exceptions.hpp
index 6f3f328e6..81b23f436 100644
--- a/src/integrations/kafka/exceptions.hpp
+++ b/src/integrations/kafka/exceptions.hpp
@@ -4,6 +4,8 @@
 
 #include <fmt/format.h>
 
+namespace integrations {
+namespace kafka {
 class KafkaStreamException : public utils::BasicException {
   using utils::BasicException::BasicException;
 };
@@ -22,6 +24,33 @@ class StreamDoesntExistException : public KafkaStreamException {
             fmt::format("Kafka stream {} doesn't exist.", stream_name)) {}
 };
 
+class StreamSerializationException : public KafkaStreamException {
+ public:
+  StreamSerializationException()
+      : KafkaStreamException("Failed to serialize stream data!") {}
+};
+
+class StreamDeserializationException : public KafkaStreamException {
+ public:
+  StreamDeserializationException()
+      : KafkaStreamException("Failed to deserialize stream data!") {}
+};
+
+class StreamMetadataCouldNotBeStored : public KafkaStreamException {
+ public:
+  explicit StreamMetadataCouldNotBeStored(const std::string &stream_name)
+      : KafkaStreamException(fmt::format(
+            "Couldn't persist stream metadata for stream {}", stream_name)) {}
+};
+
+class StreamMetadataCouldNotBeDeleted : public KafkaStreamException {
+ public:
+  explicit StreamMetadataCouldNotBeDeleted(const std::string &stream_name)
+      : KafkaStreamException(fmt::format(
+            "Couldn't delete persisted stream metadata for stream {}",
+            stream_name)) {}
+};
+
 class ConsumerFailedToInitializeException : public KafkaStreamException {
  public:
   ConsumerFailedToInitializeException(const std::string &stream_name,
@@ -57,3 +86,36 @@ class TopicNotFoundException : public KafkaStreamException {
       : KafkaStreamException(
             fmt::format("Kafka stream {}, topic not found", stream_name)) {}
 };
+
+class TransformScriptNotFoundException : public KafkaStreamException {
+ public:
+  explicit TransformScriptNotFoundException(const std::string &stream_name)
+      : KafkaStreamException(fmt::format(
+            "Couldn't find transform script for {}", stream_name)) {}
+};
+
+class TransformScriptDownloadException : public KafkaStreamException {
+ public:
+  explicit TransformScriptDownloadException(const std::string &transform_uri)
+      : KafkaStreamException(fmt::format(
+            "Couldn't get the transform script from {}", transform_uri)) {}
+};
+
+class TransformScriptCouldNotBeCreatedException : public KafkaStreamException {
+ public:
+  explicit TransformScriptCouldNotBeCreatedException(
+      const std::string &stream_name)
+      : KafkaStreamException(fmt::format(
+            "Couldn't create transform script for stream {}", stream_name)) {}
+};
+
+class TransformScriptCouldNotBeDeletedException : public KafkaStreamException {
+ public:
+  explicit TransformScriptCouldNotBeDeletedException(
+      const std::string &stream_name)
+      : KafkaStreamException(fmt::format(
+            "Couldn't delete transform script for stream {}", stream_name)) {}
+};
+
+}  // namespace kafka
+}  // namespace integrations
diff --git a/src/integrations/kafka/streams.cpp b/src/integrations/kafka/streams.cpp
index 9e34fa717..49be02223 100644
--- a/src/integrations/kafka/streams.cpp
+++ b/src/integrations/kafka/streams.cpp
@@ -1,60 +1,238 @@
 #include "integrations/kafka/streams.hpp"
+
+#include <cstdio>
+#include <experimental/filesystem>
+#include <experimental/optional>
+
+#include <json/json.hpp>
+
 #include "integrations/kafka/exceptions.hpp"
+#include "utils/file.hpp"
 
 namespace integrations {
 namespace kafka {
 
-void Streams::CreateStream(const StreamInfo &info) {
+namespace fs = std::experimental::filesystem;
+
+const std::string kMetadataDir = "metadata";
+const std::string kTransformDir = "transform";
+const std::string kTransformExt = ".py";
+
+namespace {
+
+nlohmann::json Serialize(const StreamInfo &info) {
+  nlohmann::json data = nlohmann::json::object();
+  data["stream_name"] = info.stream_name;
+  data["stream_uri"] = info.stream_uri;
+  data["stream_topic"] = info.stream_topic;
+  data["transform_uri"] = info.transform_uri;
+
+  if (info.batch_interval_in_ms) {
+    data["batch_interval_in_ms"] = info.batch_interval_in_ms.value();
+  } else {
+    data["batch_interval_in_ms"] = nullptr;
+  }
+
+  if (info.batch_size) {
+    data["batch_size"] = info.batch_size.value();
+  } else {
+    data["batch_size"] = nullptr;
+  }
+
+  if (info.limit_batches) {
+    data["limit_batches"] = info.limit_batches.value();
+  } else {
+    data["limit_batches"] = nullptr;
+  }
+
+  data["is_running"] = info.is_running;
+
+  return data;
+}
+
+StreamInfo Deserialize(const nlohmann::json &data) {
+  if (!data.is_object()) throw StreamDeserializationException();
+
+  StreamInfo info;
+
+  if (!data["stream_name"].is_string()) throw StreamDeserializationException();
+  info.stream_name = data["stream_name"];
+
+  if (!data["stream_uri"].is_string()) throw StreamDeserializationException();
+  info.stream_uri = data["stream_uri"];
+
+  if (!data["stream_topic"].is_string()) throw StreamDeserializationException();
+  info.stream_topic = data["stream_topic"];
+
+  if (!data["transform_uri"].is_string())
+    throw StreamDeserializationException();
+  info.transform_uri = data["transform_uri"];
+
+  if (data["batch_interval_in_ms"].is_number()) {
+    info.batch_interval_in_ms = data["batch_interval_in_ms"];
+  } else if (data["batch_interval_in_ms"].is_null()) {
+    info.batch_interval_in_ms = std::experimental::nullopt;
+  } else {
+    throw StreamDeserializationException();
+  }
+
+  if (data["batch_size"].is_number()) {
+    info.batch_size = data["batch_size"];
+  } else if (data["batch_size"].is_null()) {
+    info.batch_size = std::experimental::nullopt;
+  } else {
+    throw StreamDeserializationException();
+  }
+
+  if (!data["is_running"].is_boolean()) throw StreamDeserializationException();
+  info.is_running = data["is_running"];
+
+  if (data["limit_batches"].is_number()) {
+    info.limit_batches = data["limit_batches"];
+  } else if (data["limit_batches"].is_null()) {
+    info.limit_batches = std::experimental::nullopt;
+  } else {
+    throw StreamDeserializationException();
+  }
+
+  return info;
+}
+
+}  // namespace
+
+Streams::Streams(
+    const std::string &streams_directory,
+    std::function<void(const std::vector<std::string> &)> stream_writer)
+    : streams_directory_(streams_directory),
+      stream_writer_(stream_writer),
+      metadata_store_(fs::path(streams_directory) / kMetadataDir) {}
+
+void Streams::Recover() {
+  for (auto it = metadata_store_.begin(); it != metadata_store_.end(); ++it) {
+    // Check if the transform script also exists;
+    auto transform_script = GetTransformScriptPath(it->first);
+    if (!fs::exists(transform_script))
+      throw TransformScriptNotFoundException(it->first);
+
+    nlohmann::json data;
+    try {
+      data = nlohmann::json::parse(it->second);
+    } catch (const nlohmann::json::parse_error &e) {
+      throw StreamDeserializationException();
+    }
+
+    StreamInfo info = Deserialize(data);
+    Create(info);
+    if (info.is_running) Start(info.stream_name, info.limit_batches);
+  }
+}
+
+void Streams::Create(const StreamInfo &info) {
   std::lock_guard<std::mutex> g(mutex_);
   if (consumers_.find(info.stream_name) != consumers_.end())
     throw StreamExistsException(info.stream_name);
 
-  consumers_.emplace(info.stream_name, info);
+  // Store stream_info in metadata_store_.
+  if (!metadata_store_.Put(info.stream_name, Serialize(info).dump())) {
+    throw StreamMetadataCouldNotBeStored(info.stream_name);
+  }
+
+  // Make sure transform directory exists or we can create it.
+  if (!utils::EnsureDir(GetTransformScriptDir())) {
+    throw TransformScriptCouldNotBeCreatedException(info.stream_name);
+  }
+
+  try {
+    consumers_.emplace(
+        std::piecewise_construct, std::forward_as_tuple(info.stream_name),
+        std::forward_as_tuple(info, GetTransformScriptPath(info.stream_name),
+                              stream_writer_));
+  } catch (const KafkaStreamException &e) {
+    // If we failed to create the consumer, remove the persisted metadata.
+    metadata_store_.Delete(info.stream_name);
+    // Rethrow the exception.
+    throw;
+  }
 }
 
-void Streams::DropStream(const std::string &stream_name) {
+void Streams::Drop(const std::string &stream_name) {
   std::lock_guard<std::mutex> g(mutex_);
   auto find_it = consumers_.find(stream_name);
   if (find_it == consumers_.end())
     throw StreamDoesntExistException(stream_name);
 
+  // Erase and implicitly stop the consumer.
   consumers_.erase(find_it);
+
+  // Remove stream_info in metadata_store_.
+  if (!metadata_store_.Delete(stream_name)) {
+    throw StreamMetadataCouldNotBeDeleted(stream_name);
+  }
+
+  // Remove transform script.
+  if (std::remove(GetTransformScriptPath(stream_name).c_str())) {
+    throw TransformScriptNotFoundException(stream_name);
+  }
 }
 
-void Streams::StartStream(const std::string &stream_name,
-                          std::experimental::optional<int64_t> batch_limit) {
+void Streams::Start(const std::string &stream_name,
+                    std::experimental::optional<int64_t> limit_batches) {
   std::lock_guard<std::mutex> g(mutex_);
   auto find_it = consumers_.find(stream_name);
   if (find_it == consumers_.end())
     throw StreamDoesntExistException(stream_name);
 
-  find_it->second.Start(batch_limit);
+  find_it->second.Start(limit_batches);
+
+  // Store stream_info in metadata_store_.
+  if (!metadata_store_.Put(stream_name,
+                           Serialize(find_it->second.info()).dump())) {
+    throw StreamMetadataCouldNotBeStored(stream_name);
+  }
 }
 
-void Streams::StopStream(const std::string &stream_name) {
+void Streams::Stop(const std::string &stream_name) {
   std::lock_guard<std::mutex> g(mutex_);
   auto find_it = consumers_.find(stream_name);
   if (find_it == consumers_.end())
     throw StreamDoesntExistException(stream_name);
 
   find_it->second.Stop();
-}
 
-void Streams::StartAllStreams() {
-  std::lock_guard<std::mutex> g(mutex_);
-  for (auto &consumer_kv : consumers_) {
-    consumer_kv.second.StartIfNotStopped();
+  // Store stream_info in metadata_store_.
+  if (!metadata_store_.Put(stream_name,
+                           Serialize(find_it->second.info()).dump())) {
+    throw StreamMetadataCouldNotBeStored(stream_name);
   }
 }
 
-void Streams::StopAllStreams() {
+void Streams::StartAll() {
   std::lock_guard<std::mutex> g(mutex_);
   for (auto &consumer_kv : consumers_) {
-    consumer_kv.second.StopIfNotRunning();
+    consumer_kv.second.StartIfStopped();
+
+    // Store stream_info in metadata_store_.
+    if (!metadata_store_.Put(consumer_kv.first,
+                             Serialize(consumer_kv.second.info()).dump())) {
+      throw StreamMetadataCouldNotBeStored(consumer_kv.first);
+    }
   }
 }
 
-std::vector<StreamInfo> Streams::ShowStreams() {
+void Streams::StopAll() {
+  std::lock_guard<std::mutex> g(mutex_);
+  for (auto &consumer_kv : consumers_) {
+    consumer_kv.second.StopIfRunning();
+
+    // Store stream_info in metadata_store_.
+    if (!metadata_store_.Put(consumer_kv.first,
+                             Serialize(consumer_kv.second.info()).dump())) {
+      throw StreamMetadataCouldNotBeStored(consumer_kv.first);
+    }
+  }
+}
+
+std::vector<StreamInfo> Streams::Show() {
   std::vector<StreamInfo> streams;
   std::lock_guard<std::mutex> g(mutex_);
   for (auto &consumer_kv : consumers_) {
@@ -64,15 +242,23 @@ std::vector<StreamInfo> Streams::ShowStreams() {
   return streams;
 }
 
-std::vector<std::string> Streams::TestStream(
+std::vector<std::string> Streams::Test(
     const std::string &stream_name,
-    std::experimental::optional<int64_t> batch_limit) {
+    std::experimental::optional<int64_t> limit_batches) {
   std::lock_guard<std::mutex> g(mutex_);
   auto find_it = consumers_.find(stream_name);
   if (find_it == consumers_.end())
     throw StreamDoesntExistException(stream_name);
 
-  return find_it->second.Test(batch_limit);
+  return find_it->second.Test(limit_batches);
+}
+
+std::string Streams::GetTransformScriptDir() {
+  return fs::path(streams_directory_) / kTransformDir;
+}
+
+std::string Streams::GetTransformScriptPath(const std::string &stream_name) {
+  return fs::path(GetTransformScriptDir()) / (stream_name + kTransformExt);
 }
 
 }  // namespace kafka
diff --git a/src/integrations/kafka/streams.hpp b/src/integrations/kafka/streams.hpp
index 5b485f7cc..d9d3de77f 100644
--- a/src/integrations/kafka/streams.hpp
+++ b/src/integrations/kafka/streams.hpp
@@ -6,37 +6,49 @@
 #include <mutex>
 #include <unordered_map>
 
+#include "storage/kvstore.hpp"
+
 namespace integrations {
 namespace kafka {
 
 class Streams final {
  public:
-  void CreateStream(const StreamInfo &info);
+  Streams(const std::string &streams_directory,
+          std::function<void(const std::vector<std::string> &)> stream_writer);
 
-  void DropStream(const std::string &stream_name);
+  void Recover();
 
-  void StartStream(const std::string &stream_name,
-                   std::experimental::optional<int64_t> batch_limit =
-                       std::experimental::nullopt);
+  void Create(const StreamInfo &info);
 
-  void StopStream(const std::string &stream_name);
+  void Drop(const std::string &stream_name);
 
-  void StartAllStreams();
+  void Start(const std::string &stream_name,
+             std::experimental::optional<int64_t> batch_limit =
+                 std::experimental::nullopt);
 
-  void StopAllStreams();
+  void Stop(const std::string &stream_name);
 
-  std::vector<StreamInfo> ShowStreams();
+  void StartAll();
 
-  std::vector<std::string> TestStream(
-      const std::string &stream_name,
-      std::experimental::optional<int64_t> batch_limit =
-          std::experimental::nullopt);
+  void StopAll();
+
+  std::vector<StreamInfo> Show();
+
+  std::vector<std::string> Test(const std::string &stream_name,
+                                std::experimental::optional<int64_t>
+                                    batch_limit = std::experimental::nullopt);
 
  private:
+  std::string streams_directory_;
+  std::function<void(const std::vector<std::string> &)> stream_writer_;
+
+  storage::KVStore metadata_store_;
+
   std::mutex mutex_;
   std::unordered_map<std::string, Consumer> consumers_;
 
-  // TODO (msantl): persist stream storage
+  std::string GetTransformScriptDir();
+  std::string GetTransformScriptPath(const std::string &stream_name);
 };
 
 }  // namespace kafka
diff --git a/src/integrations/kafka/transform.cpp b/src/integrations/kafka/transform.cpp
new file mode 100644
index 000000000..c157bd2cd
--- /dev/null
+++ b/src/integrations/kafka/transform.cpp
@@ -0,0 +1,31 @@
+#include "integrations/kafka/transform.hpp"
+
+#include "integrations/kafka/exceptions.hpp"
+#include "requests/requests.hpp"
+
+namespace integrations {
+namespace kafka {
+
+Transform::Transform(const std::string &transform_script_uri,
+                     const std::string &transform_script_path)
+    : transform_script_path_(transform_script_path) {
+  if (!requests::CreateAndDownloadFile(transform_script_uri,
+                                       transform_script_path)) {
+    throw TransformScriptDownloadException(transform_script_uri);
+  }
+}
+
+std::vector<std::string> Transform::Apply(
+    const std::vector<std::unique_ptr<RdKafka::Message>> &batch) {
+  // TODO (msantl): dummy transform, do the actual transform later @mferencevic
+  std::vector<std::string> transformed_batch;
+  transformed_batch.reserve(batch.size());
+  for (auto &record : batch) {
+    transformed_batch.push_back(reinterpret_cast<char *>(record->payload()));
+  }
+
+  return transformed_batch;
+}
+
+}  // namespace kafka
+}  // namespace integrations
diff --git a/src/integrations/kafka/transform.hpp b/src/integrations/kafka/transform.hpp
new file mode 100644
index 000000000..1e4804d1f
--- /dev/null
+++ b/src/integrations/kafka/transform.hpp
@@ -0,0 +1,27 @@
+#pragma once
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "rdkafkacpp.h"
+
+namespace integrations {
+namespace kafka {
+
+class Transform final {
+ public:
+  Transform(const std::string &transform_script_uri,
+            const std::string &transform_script_path);
+
+  std::vector<std::string> Apply(
+      const std::vector<std::unique_ptr<RdKafka::Message>> &batch);
+
+  auto transform_script_path() const { return transform_script_path_; }
+
+ private:
+  std::string transform_script_path_;
+};
+
+}  // namespace kafka
+}  // namespace integrations
diff --git a/src/memgraph_bolt.cpp b/src/memgraph_bolt.cpp
index 86cdf4bfd..60ef7242a 100644
--- a/src/memgraph_bolt.cpp
+++ b/src/memgraph_bolt.cpp
@@ -16,9 +16,12 @@
 #include "database/graph_db.hpp"
 #include "distributed/pull_rpc_clients.hpp"
 #include "glue/conversion.hpp"
+#include "integrations/kafka/exceptions.hpp"
+#include "integrations/kafka/streams.hpp"
 #include "query/exceptions.hpp"
 #include "query/interpreter.hpp"
 #include "query/transaction_engine.hpp"
+#include "requests/requests.hpp"
 #include "stats/stats.hpp"
 #include "telemetry/telemetry.hpp"
 #include "utils/flag_validation.hpp"
@@ -140,6 +143,17 @@ class BoltSession final
 using ServerT = communication::Server<BoltSession, SessionData>;
 using communication::ServerContext;
 
+/**
+ * Class that implements ResultStream API for Kafka.
+ *
+ * Kafka doesn't need to stream the import results back to the client so we
+ * don't need any functionality here.
+ */
+class KafkaResultStream {
+ public:
+  void Result(const std::vector<query::TypedValue> &) {}
+};
+
 // Needed to correctly handle memgraph destruction from a signal handler.
 // Without having some sort of a flag, it is possible that a signal is handled
 // when we are exiting main, inside destructors of database::GraphDb and
@@ -239,6 +253,8 @@ int WithInit(int argc, char **argv,
                      << " MB left.";
     });
   }
+  requests::Init();
+
   memgraph_main();
   return 0;
 }
@@ -248,6 +264,34 @@ void SingleNodeMain() {
   database::SingleNode db;
   SessionData session_data{db};
 
+  auto stream_writer =
+      [&session_data](const std::vector<std::string> &queries) {
+        database::GraphDbAccessor dba(session_data.db);
+        for (auto &query : queries) {
+          KafkaResultStream stream;
+          try {
+            session_data.interpreter(query, dba, {}, false).PullAll(stream);
+          } catch (const query::QueryException &e) {
+            LOG(ERROR) << e.what();
+          }
+        }
+        dba.Commit();
+      };
+
+  integrations::kafka::Streams kafka_streams{
+      std::experimental::filesystem::path(FLAGS_durability_directory) /
+          "streams",
+      stream_writer};
+
+  try {
+    // Recover possible streams.
+    kafka_streams.Recover();
+  } catch (const integrations::kafka::KafkaStreamException &e) {
+    LOG(ERROR) << e.what();
+  }
+
+  session_data.interpreter.kafka_streams_ = &kafka_streams;
+
   ServerContext context;
   std::string service_name = "Bolt";
   if (FLAGS_key_file != "" && FLAGS_cert_file != "") {
@@ -262,7 +306,6 @@ void SingleNodeMain() {
   // Setup telemetry
   std::experimental::optional<telemetry::Telemetry> telemetry;
   if (FLAGS_telemetry_enabled) {
-    telemetry::Init();
     telemetry.emplace(
         "https://telemetry.memgraph.com/88b5e7e8-746a-11e8-9f85-538a9e9690cc/",
         std::experimental::filesystem::path(FLAGS_durability_directory) /
@@ -310,6 +353,34 @@ void MasterMain() {
   database::Master db;
   SessionData session_data{db};
 
+  auto stream_writer =
+      [&session_data](const std::vector<std::string> &queries) {
+        database::GraphDbAccessor dba(session_data.db);
+        for (auto &query : queries) {
+          KafkaResultStream stream;
+          try {
+            session_data.interpreter(query, dba, {}, false).PullAll(stream);
+          } catch (const query::QueryException &e) {
+            LOG(ERROR) << e.what();
+          }
+        }
+        dba.Commit();
+      };
+
+  integrations::kafka::Streams kafka_streams{
+      std::experimental::filesystem::path(FLAGS_durability_directory) /
+          "streams",
+      stream_writer};
+
+  try {
+    // Recover possible streams.
+    kafka_streams.Recover();
+  } catch (const integrations::kafka::KafkaStreamException &e) {
+    LOG(ERROR) << e.what();
+  }
+
+  session_data.interpreter.kafka_streams_ = &kafka_streams;
+
   ServerContext context;
   std::string service_name = "Bolt";
   if (FLAGS_key_file != "" && FLAGS_cert_file != "") {
diff --git a/src/query/context.hpp b/src/query/context.hpp
index 775d7ff68..9f6f79ebf 100644
--- a/src/query/context.hpp
+++ b/src/query/context.hpp
@@ -5,6 +5,12 @@
 #include "query/frontend/semantic/symbol_table.hpp"
 #include "query/parameters.hpp"
 
+namespace integrations {
+namespace kafka {
+class Streams;
+}  // namespace kafka
+}  // namespace integrations
+
 namespace query {
 
 class Context {
@@ -25,6 +31,8 @@ class Context {
   bool in_explicit_transaction_ = false;
   bool is_index_created_ = false;
   int64_t timestamp_{-1};
+
+  integrations::kafka::Streams *kafka_streams_ = nullptr;
 };
 
 }  // namespace query
diff --git a/src/query/interpreter.cpp b/src/query/interpreter.cpp
index 5c5df863b..0cbbf1869 100644
--- a/src/query/interpreter.cpp
+++ b/src/query/interpreter.cpp
@@ -61,6 +61,7 @@ Interpreter::Results Interpreter::operator()(
   ctx.timestamp_ = std::chrono::duration_cast<std::chrono::milliseconds>(
                        std::chrono::system_clock::now().time_since_epoch())
                        .count();
+  ctx.kafka_streams_ = kafka_streams_;
 
   // query -> stripped query
   StrippedQuery stripped(query);
diff --git a/src/query/interpreter.hpp b/src/query/interpreter.hpp
index 8ab963292..f97f3bd05 100644
--- a/src/query/interpreter.hpp
+++ b/src/query/interpreter.hpp
@@ -20,6 +20,12 @@ namespace distributed {
 class PlanDispatcher;
 }
 
+namespace integrations {
+namespace kafka {
+class Streams;
+}  // namespace kafka
+}  // namespace integrations
+
 namespace query {
 
 class Interpreter {
@@ -159,6 +165,8 @@ class Interpreter {
                      const std::map<std::string, TypedValue> &params,
                      bool in_explicit_transaction);
 
+  integrations::kafka::Streams *kafka_streams_ = nullptr;
+
  private:
   ConcurrentMap<HashType, AstStorage> ast_cache_;
   PlanCacheT plan_cache_;
diff --git a/src/query/plan/operator.cpp b/src/query/plan/operator.cpp
index 2f9a82fcc..be5d0e24e 100644
--- a/src/query/plan/operator.cpp
+++ b/src/query/plan/operator.cpp
@@ -16,12 +16,14 @@
 #include "boost/serialization/export.hpp"
 #include "glog/logging.h"
 
+#include "communication/result_stream_faker.hpp"
 #include "database/graph_db_accessor.hpp"
 #include "distributed/bfs_rpc_clients.hpp"
 #include "distributed/pull_rpc_clients.hpp"
 #include "distributed/updates_rpc_clients.hpp"
 #include "distributed/updates_rpc_server.hpp"
 #include "integrations/kafka/exceptions.hpp"
+#include "integrations/kafka/streams.hpp"
 #include "query/context.hpp"
 #include "query/exceptions.hpp"
 #include "query/frontend/ast/ast.hpp"
@@ -3916,8 +3918,8 @@ class CreateStreamCursor : public Cursor {
   using StreamInfo = integrations::kafka::StreamInfo;
 
  public:
-  CreateStreamCursor(const CreateStream &self, database::GraphDbAccessor &db)
-      : self_(self), db_(db) {}
+  CreateStreamCursor(const CreateStream &self, database::GraphDbAccessor &)
+      : self_(self) {}
 
   bool Pull(Frame &frame, Context &ctx) override {
     if (ctx.in_explicit_transaction_) {
@@ -3948,8 +3950,8 @@ class CreateStreamCursor : public Cursor {
       info.batch_interval_in_ms = batch_interval_in_ms;
       info.batch_size = batch_size;
 
-      db_.db().kafka_streams().CreateStream(info);
-    } catch (const KafkaStreamException &e) {
+      ctx.kafka_streams_->Create(info);
+    } catch (const integrations::kafka::KafkaStreamException &e) {
       throw QueryRuntimeException(e.what());
     }
 
@@ -3960,7 +3962,6 @@ class CreateStreamCursor : public Cursor {
 
  private:
   const CreateStream &self_;
-  database::GraphDbAccessor &db_;
 };
 
 std::unique_ptr<Cursor> CreateStream::MakeCursor(
@@ -3975,8 +3976,8 @@ WITHOUT_SINGLE_INPUT(DropStream)
 
 class DropStreamCursor : public Cursor {
  public:
-  DropStreamCursor(const DropStream &self, database::GraphDbAccessor &db)
-      : self_(self), db_(db) {}
+  DropStreamCursor(const DropStream &self, database::GraphDbAccessor &)
+      : self_(self) {}
 
   bool Pull(Frame &frame, Context &ctx) override {
     if (ctx.in_explicit_transaction_) {
@@ -3984,8 +3985,8 @@ class DropStreamCursor : public Cursor {
     }
 
     try {
-      db_.db().kafka_streams().DropStream(self_.stream_name());
-    } catch (const KafkaStreamException &e) {
+      ctx.kafka_streams_->Drop(self_.stream_name());
+    } catch (const integrations::kafka::KafkaStreamException &e) {
       throw QueryRuntimeException(e.what());
     }
     return false;
@@ -3995,7 +3996,6 @@ class DropStreamCursor : public Cursor {
 
  private:
   const DropStream &self_;
-  database::GraphDbAccessor &db_;
 };
 
 std::unique_ptr<Cursor> DropStream::MakeCursor(
@@ -4021,8 +4021,8 @@ std::vector<Symbol> ShowStreams::OutputSymbols(const SymbolTable &) const {
 
 class ShowStreamsCursor : public Cursor {
  public:
-  ShowStreamsCursor(const ShowStreams &self, database::GraphDbAccessor &db)
-      : self_(self), db_(db) {}
+  ShowStreamsCursor(const ShowStreams &self, database::GraphDbAccessor &)
+      : self_(self) {}
 
   bool Pull(Frame &frame, Context &ctx) override {
     if (ctx.in_explicit_transaction_) {
@@ -4030,7 +4030,7 @@ class ShowStreamsCursor : public Cursor {
     }
 
     if (!is_initialized_) {
-      streams_ = db_.db().kafka_streams().ShowStreams();
+      streams_ = ctx.kafka_streams_->Show();
       streams_it_ = streams_.begin();
       is_initialized_ = true;
     }
@@ -4052,7 +4052,6 @@ class ShowStreamsCursor : public Cursor {
 
  private:
   const ShowStreams &self_;
-  database::GraphDbAccessor &db_;
 
   bool is_initialized_ = false;
   using StreamInfo = integrations::kafka::StreamInfo;
@@ -4076,8 +4075,8 @@ WITHOUT_SINGLE_INPUT(StartStopStream)
 class StartStopStreamCursor : public Cursor {
  public:
   StartStopStreamCursor(const StartStopStream &self,
-                        database::GraphDbAccessor &db)
-      : self_(self), db_(db) {}
+                        database::GraphDbAccessor &)
+      : self_(self) {}
 
   bool Pull(Frame &frame, Context &ctx) override {
     if (ctx.in_explicit_transaction_) {
@@ -4093,12 +4092,11 @@ class StartStopStreamCursor : public Cursor {
 
     try {
       if (self_.is_start()) {
-        db_.db().kafka_streams().StartStream(self_.stream_name(),
-                                             limit_batches);
+        ctx.kafka_streams_->Start(self_.stream_name(), limit_batches);
       } else {
-        db_.db().kafka_streams().StopStream(self_.stream_name());
+        ctx.kafka_streams_->Stop(self_.stream_name());
       }
-    } catch (const KafkaStreamException &e) {
+    } catch (const integrations::kafka::KafkaStreamException &e) {
       throw QueryRuntimeException(e.what());
     }
 
@@ -4109,7 +4107,6 @@ class StartStopStreamCursor : public Cursor {
 
  private:
   const StartStopStream &self_;
-  database::GraphDbAccessor &db_;
 };
 
 std::unique_ptr<Cursor> StartStopStream::MakeCursor(
@@ -4124,8 +4121,8 @@ WITHOUT_SINGLE_INPUT(StartStopAllStreams)
 class StartStopAllStreamsCursor : public Cursor {
  public:
   StartStopAllStreamsCursor(const StartStopAllStreams &self,
-                            database::GraphDbAccessor &db)
-      : self_(self), db_(db) {}
+                            database::GraphDbAccessor &)
+      : self_(self) {}
 
   bool Pull(Frame &frame, Context &ctx) override {
     if (ctx.in_explicit_transaction_) {
@@ -4134,11 +4131,11 @@ class StartStopAllStreamsCursor : public Cursor {
 
     try {
       if (self_.is_start()) {
-        db_.db().kafka_streams().StartAllStreams();
+        ctx.kafka_streams_->StartAll();
       } else {
-        db_.db().kafka_streams().StopAllStreams();
+        ctx.kafka_streams_->StopAll();
       }
-    } catch (const KafkaStreamException &e) {
+    } catch (const integrations::kafka::KafkaStreamException &e) {
       throw QueryRuntimeException(e.what());
     }
 
@@ -4151,7 +4148,6 @@ class StartStopAllStreamsCursor : public Cursor {
 
  private:
   const StartStopAllStreams &self_;
-  database::GraphDbAccessor &db_;
 };
 
 std::unique_ptr<Cursor> StartStopAllStreams::MakeCursor(
@@ -4167,10 +4163,14 @@ TestStream::TestStream(std::string stream_name, Expression *limit_batches,
 
 WITHOUT_SINGLE_INPUT(TestStream)
 
+std::vector<Symbol> TestStream::OutputSymbols(const SymbolTable &) const {
+  return {test_result_symbol_};
+}
+
 class TestStreamCursor : public Cursor {
  public:
-  TestStreamCursor(const TestStream &self, database::GraphDbAccessor &db)
-      : self_(self), db_(db) {}
+  TestStreamCursor(const TestStream &self, database::GraphDbAccessor &)
+      : self_(self) {}
 
   bool Pull(Frame &frame, Context &ctx) override {
     if (ctx.in_explicit_transaction_) {
@@ -4187,9 +4187,8 @@ class TestStreamCursor : public Cursor {
       }
 
       try {
-        results_ = db_.db().kafka_streams().TestStream(self_.stream_name(),
-                                                       limit_batches);
-      } catch (const KafkaStreamException &e) {
+        results_ = ctx.kafka_streams_->Test(self_.stream_name(), limit_batches);
+      } catch (const integrations::kafka::KafkaStreamException &e) {
         throw QueryRuntimeException(e.what());
       }
       results_it_ = results_.begin();
@@ -4208,7 +4207,6 @@ class TestStreamCursor : public Cursor {
 
  private:
   const TestStream &self_;
-  database::GraphDbAccessor &db_;
 
   bool is_initialized_ = false;
   std::vector<std::string> results_;
diff --git a/src/query/plan/operator.lcp b/src/query/plan/operator.lcp
index 2bd5365ce..60b976f69 100644
--- a/src/query/plan/operator.lcp
+++ b/src/query/plan/operator.lcp
@@ -2703,6 +2703,7 @@ in the db.")
    DEFVISITABLE(HierarchicalLogicalOperatorVisitor);
    std::unique_ptr<Cursor> MakeCursor(
        database::GraphDbAccessor &db) const override;
+   std::vector<Symbol> OutputSymbols(const SymbolTable &) const override;
    std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override {
      return {};
    }
diff --git a/src/query/plan/rule_based_planner.hpp b/src/query/plan/rule_based_planner.hpp
index 58e0336bf..d1e2c8794 100644
--- a/src/query/plan/rule_based_planner.hpp
+++ b/src/query/plan/rule_based_planner.hpp
@@ -212,7 +212,7 @@ class RuleBasedPlanner {
               symbol_table.CreateSymbol("uri", false),
               symbol_table.CreateSymbol("topic", false),
               symbol_table.CreateSymbol("transform", false),
-              symbol_table.CreateSymbol("is running", false));
+              symbol_table.CreateSymbol("is_running", false));
         } else if (auto *start_stop_stream =
                        dynamic_cast<query::StartStopStream *>(clause)) {
           DCHECK(!input_op) << "Unexpected operator before StartStopStream";
@@ -230,7 +230,7 @@ class RuleBasedPlanner {
           auto &symbol_table = context.symbol_table;
           input_op = std::make_unique<plan::TestStream>(
               test_stream->stream_name_, test_stream->limit_batches_,
-              symbol_table.CreateSymbol("test result", false));
+              symbol_table.CreateSymbol("test_result", false));
         } else {
           throw utils::NotYetImplemented("clause conversion to operator(s)");
         }
diff --git a/src/requests/CMakeLists.txt b/src/requests/CMakeLists.txt
new file mode 100644
index 000000000..370bb0c2c
--- /dev/null
+++ b/src/requests/CMakeLists.txt
@@ -0,0 +1,8 @@
+set(requests_src_files
+    requests.cpp)
+
+find_package(CURL REQUIRED)
+
+add_library(mg-requests STATIC ${requests_src_files})
+target_link_libraries(mg-requests fmt glog gflags json ${CURL_LIBRARIES})
+target_include_directories(mg-requests PRIVATE ${CURL_INCLUDE_DIRS})
diff --git a/src/telemetry/requests.cpp b/src/requests/requests.cpp
similarity index 51%
rename from src/telemetry/requests.cpp
rename to src/requests/requests.cpp
index de0f6e2cb..e0593004f 100644
--- a/src/telemetry/requests.cpp
+++ b/src/requests/requests.cpp
@@ -1,21 +1,26 @@
-#include "telemetry/requests.hpp"
+#include "requests/requests.hpp"
+
+#include <cstdio>
 
 #include <curl/curl.h>
-
 #include <fmt/format.h>
 #include <gflags/gflags.h>
 #include <glog/logging.h>
 
-namespace telemetry {
+namespace requests {
 
-void RequestsInit() { curl_global_init(CURL_GLOBAL_ALL); }
+namespace {
 
 size_t CurlWriteCallback(char *ptr, size_t size, size_t nmemb, void *userdata) {
   return nmemb;
 }
 
+}  // namespace
+
+void Init() { curl_global_init(CURL_GLOBAL_ALL); }
+
 bool RequestPostJson(const std::string &url, const nlohmann::json &data,
-                     const int timeout) {
+                     int timeout_in_seconds) {
   CURL *curl = nullptr;
   CURLcode res = CURLE_UNSUPPORTED_PROTOCOL;
 
@@ -40,7 +45,7 @@ bool RequestPostJson(const std::string &url, const nlohmann::json &data,
   curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1);
   curl_easy_setopt(curl, CURLOPT_MAXREDIRS, 10);
   curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
-  curl_easy_setopt(curl, CURLOPT_TIMEOUT, timeout);
+  curl_easy_setopt(curl, CURLOPT_TIMEOUT, timeout_in_seconds);
 
   res = curl_easy_perform(curl);
   curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &response_code);
@@ -60,4 +65,46 @@ bool RequestPostJson(const std::string &url, const nlohmann::json &data,
   return true;
 }
 
-}  // namespace telemetry
+bool CreateAndDownloadFile(const std::string &url, const std::string &path,
+                           int timeout_in_seconds) {
+  CURL *curl = nullptr;
+  CURLcode res = CURLE_UNSUPPORTED_PROTOCOL;
+
+  long response_code = 0;
+  std::string user_agent = fmt::format("memgraph/{}", gflags::VersionString());
+
+  curl = curl_easy_init();
+  if (!curl) return false;
+
+  FILE *file = std::fopen(path.c_str(), "w");
+  if (!file) return false;
+
+  curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
+  curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "GET");
+  curl_easy_setopt(curl, CURLOPT_USERAGENT, user_agent.c_str());
+  curl_easy_setopt(curl, CURLOPT_WRITEDATA, file);
+  curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1);
+  curl_easy_setopt(curl, CURLOPT_MAXREDIRS, 10);
+  curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
+  curl_easy_setopt(curl, CURLOPT_TIMEOUT, timeout_in_seconds);
+
+  res = curl_easy_perform(curl);
+  curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &response_code);
+  curl_easy_cleanup(curl);
+  std::fclose(file);
+
+  if (res != CURLE_OK) {
+    DLOG(WARNING) << "Couldn't perform request: " << curl_easy_strerror(res);
+    return false;
+  }
+
+  if (response_code != 200) {
+    DLOG(WARNING) << "Request response code isn't 200 (received "
+                  << response_code << ")!";
+    return false;
+  }
+
+  return true;
+}
+
+}  // namespace requests
diff --git a/src/requests/requests.hpp b/src/requests/requests.hpp
new file mode 100644
index 000000000..4de22d436
--- /dev/null
+++ b/src/requests/requests.hpp
@@ -0,0 +1,41 @@
+#pragma once
+
+#include <string>
+
+#include <json/json.hpp>
+
+namespace requests {
+
+/**
+ * Call this function in each `main` file that uses the Requests stack. It is
+ * used to initialize all libraries (primarily cURL).
+ *
+ * NOTE: This function must be called **exactly** once.
+ */
+void Init();
+
+/**
+ *
+ * This function sends a POST request with a JSON payload to the `url`.
+ *
+ * @param url url to which to send the request
+ * @param data json payload
+ * @param timeout the timeout that should be used when making the request
+ * @return bool true if the request was successful, false otherwise.
+ */
+bool RequestPostJson(const std::string &url, const nlohmann::json &data,
+                     int timeout_in_seconds = 10);
+
+/**
+ * This functions sends a GET request to the given `url` and writes the response
+ * to the given `path`.
+ *
+ * @param url url to which to send the request
+ * @param path path to the file where the response in writeen
+ * @param timeout the timeout that should be used when making the request
+ * @return bool true if the request was successful, false otherwise.
+ */
+bool CreateAndDownloadFile(const std::string &url, const std::string &path,
+                           int timeout_in_seconds = 10);
+
+}  // namespace requests
diff --git a/src/telemetry/CMakeLists.txt b/src/telemetry/CMakeLists.txt
index 128119391..ae7841d21 100644
--- a/src/telemetry/CMakeLists.txt
+++ b/src/telemetry/CMakeLists.txt
@@ -1,11 +1,7 @@
 set(telemetry_src_files
     collectors.cpp
-    requests.cpp
     telemetry.cpp
     system_info.cpp)
 
-find_package(CURL REQUIRED)
-
 add_library(telemetry_lib STATIC ${telemetry_src_files})
-target_link_libraries(telemetry_lib glog json ${CURL_LIBRARIES} kvstore_lib)
-target_include_directories(telemetry_lib PRIVATE ${CURL_INCLUDE_DIRS})
+target_link_libraries(telemetry_lib glog mg-requests kvstore_lib)
diff --git a/src/telemetry/requests.hpp b/src/telemetry/requests.hpp
deleted file mode 100644
index 1bb707587..000000000
--- a/src/telemetry/requests.hpp
+++ /dev/null
@@ -1,26 +0,0 @@
-#pragma once
-
-#include <string>
-
-#include <json/json.hpp>
-
-namespace telemetry {
-
-/**
- * This function is called by the main telemetry `Init` function to initialize
- * the requests library.
- */
-void RequestsInit();
-
-/**
- * This function sends a POST request with a JSON payload to the `url`.
- *
- * @param url url to which to send the request
- * @param data json payload
- * @param timeout the timeout that should be used when making the request
- * @return bool true if the request was successful, false otherwise.
- */
-bool RequestPostJson(const std::string &url, const nlohmann::json &data,
-                     const int timeout = 10);
-
-}  // namespace telemetry
diff --git a/src/telemetry/telemetry.cpp b/src/telemetry/telemetry.cpp
index 12377e653..d78f8a82d 100644
--- a/src/telemetry/telemetry.cpp
+++ b/src/telemetry/telemetry.cpp
@@ -5,8 +5,8 @@
 #include <fmt/format.h>
 #include <glog/logging.h>
 
+#include "requests/requests.hpp"
 #include "telemetry/collectors.hpp"
-#include "telemetry/requests.hpp"
 #include "telemetry/system_info.hpp"
 #include "utils/timestamp.hpp"
 #include "utils/uuid.hpp"
@@ -15,8 +15,6 @@ namespace telemetry {
 
 const int kMaxBatchSize = 100;
 
-void Init() { RequestsInit(); }
-
 Telemetry::Telemetry(
     const std::string &url,
     const std::experimental::filesystem::path &storage_directory,
@@ -69,7 +67,7 @@ void Telemetry::SendData() {
     }
   }
 
-  if (RequestPostJson(url_, payload)) {
+  if (requests::RequestPostJson(url_, payload)) {
     for (const auto &key : keys) {
       if (!storage_.Delete(key)) {
         DLOG(WARNING) << "Couldn't delete key " << key
diff --git a/src/telemetry/telemetry.hpp b/src/telemetry/telemetry.hpp
index eead25cc8..a008bab84 100644
--- a/src/telemetry/telemetry.hpp
+++ b/src/telemetry/telemetry.hpp
@@ -11,14 +11,6 @@
 
 namespace telemetry {
 
-/**
- * Call this function in each `main` file that uses the Telemetry stack. It is
- * used to initialize all libraries (primarily cURL).
- *
- * NOTE: This function must be called **exactly** once.
- */
-void Init();
-
 /**
  * This class implements the telemetry collector service. It periodically scapes
  * all registered collectors and stores their data. With periodically scraping
diff --git a/tests/integration/telemetry/CMakeLists.txt b/tests/integration/telemetry/CMakeLists.txt
index 96dca10e2..68763cb6d 100644
--- a/tests/integration/telemetry/CMakeLists.txt
+++ b/tests/integration/telemetry/CMakeLists.txt
@@ -3,4 +3,4 @@ set(client_target_name ${target_name}__client)
 
 add_executable(${client_target_name} client.cpp)
 set_target_properties(${client_target_name} PROPERTIES OUTPUT_NAME client)
-target_link_libraries(${client_target_name} telemetry_lib)
+target_link_libraries(${client_target_name} mg-requests telemetry_lib)
diff --git a/tests/integration/telemetry/client.cpp b/tests/integration/telemetry/client.cpp
index bebfbbeaa..a47e3c74f 100644
--- a/tests/integration/telemetry/client.cpp
+++ b/tests/integration/telemetry/client.cpp
@@ -1,5 +1,6 @@
 #include <gflags/gflags.h>
 
+#include "requests/requests.hpp"
 #include "telemetry/telemetry.hpp"
 
 DEFINE_string(endpoint, "http://127.0.0.1:9000/",
@@ -14,7 +15,7 @@ int main(int argc, char **argv) {
   gflags::ParseCommandLineFlags(&argc, &argv, true);
   google::InitGoogleLogging(argv[0]);
 
-  telemetry::Init();
+  requests::Init();
   telemetry::Telemetry telemetry(FLAGS_endpoint, FLAGS_storage_directory,
                                  std::chrono::seconds(FLAGS_interval), 1);