From 1febc15d687e66291fca84aab6946711d1510c2b Mon Sep 17 00:00:00 2001
From: Matej Ferencevic <matej.ferencevic@memgraph.io>
Date: Tue, 14 Aug 2018 11:11:30 +0200
Subject: [PATCH] Implement kafka integration test

Reviewers: msantl

Reviewed By: msantl

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1538
---
 src/integrations/kafka/consumer.cpp    |   3 +
 tests/integration/CMakeLists.txt       |   3 +
 tests/integration/apollo_runs.yaml     |  11 ++
 tests/integration/kafka/CMakeLists.txt |   6 +
 tests/integration/kafka/runner.sh      | 163 +++++++++++++++++++++++++
 tests/integration/kafka/tester.cpp     |  84 +++++++++++++
 tests/integration/kafka/transform.py   |  15 +++
 7 files changed, 285 insertions(+)
 create mode 100644 tests/integration/kafka/CMakeLists.txt
 create mode 100755 tests/integration/kafka/runner.sh
 create mode 100644 tests/integration/kafka/tester.cpp
 create mode 100644 tests/integration/kafka/transform.py

diff --git a/src/integrations/kafka/consumer.cpp b/src/integrations/kafka/consumer.cpp
index a9301924f..f357fa46c 100644
--- a/src/integrations/kafka/consumer.cpp
+++ b/src/integrations/kafka/consumer.cpp
@@ -134,6 +134,9 @@ void Consumer::StartConsuming(
 
       if (batch.empty()) continue;
 
+      DLOG(INFO) << "[Kafka] stream " << info_.stream_name
+                 << " processing a batch";
+
       // All exceptions that could be possibly thrown by the `Apply` function
       // must be handled here because they *will* crash the database if
       // uncaught!
diff --git a/tests/integration/CMakeLists.txt b/tests/integration/CMakeLists.txt
index 1f7abaf31..c4d559ea4 100644
--- a/tests/integration/CMakeLists.txt
+++ b/tests/integration/CMakeLists.txt
@@ -6,3 +6,6 @@ add_subdirectory(ssl)
 
 # transactions test binaries
 add_subdirectory(transactions)
+
+# kafka test binaries
+add_subdirectory(kafka)
diff --git a/tests/integration/apollo_runs.yaml b/tests/integration/apollo_runs.yaml
index 213519bba..b91328a2a 100644
--- a/tests/integration/apollo_runs.yaml
+++ b/tests/integration/apollo_runs.yaml
@@ -22,3 +22,14 @@
     - runner.sh # runner script
     - ../../../build_debug/memgraph # memgraph binary
     - ../../../build_debug/tests/integration/transactions/tester # tester binary
+
+- name: integration__kafka
+  cd: kafka
+  commands: ./runner.sh
+  infiles:
+    - runner.sh # runner script
+    - transform.py # transform script
+    - ../../../build_debug/memgraph # memgraph binary
+    - ../../../build_debug/kafka.py # kafka script
+    - ../../../build_debug/tests/integration/kafka/tester # tester binary
+  enable_network: true
diff --git a/tests/integration/kafka/CMakeLists.txt b/tests/integration/kafka/CMakeLists.txt
new file mode 100644
index 000000000..74781a6a4
--- /dev/null
+++ b/tests/integration/kafka/CMakeLists.txt
@@ -0,0 +1,6 @@
+set(target_name memgraph__integration__kafka)
+set(tester_target_name ${target_name}__tester)
+
+add_executable(${tester_target_name} tester.cpp)
+set_target_properties(${tester_target_name} PROPERTIES OUTPUT_NAME tester)
+target_link_libraries(${tester_target_name} mg-communication)
diff --git a/tests/integration/kafka/runner.sh b/tests/integration/kafka/runner.sh
new file mode 100755
index 000000000..da900ff8f
--- /dev/null
+++ b/tests/integration/kafka/runner.sh
@@ -0,0 +1,163 @@
+#!/bin/bash
+
+## Helper functions
+
+function wait_for_server {
+    port=$1
+    while ! nc -z -w 1 127.0.0.1 $port; do
+        sleep 0.1
+    done
+    sleep 1
+}
+
+function echo_info { printf "\033[1;36m~~ $1 ~~\033[0m\n"; }
+function echo_success { printf "\033[1;32m~~ $1 ~~\033[0m\n\n"; }
+function echo_failure { printf "\033[1;31m~~ $1 ~~\033[0m\n\n"; }
+
+
+## Environment setup
+
+# Get script location.
+DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
+cd "$DIR"
+
+# Create a temporary directory.
+tmpdir=/tmp/memgraph_integration_kafka
+if [ -d $tmpdir ]; then
+    rm -rf $tmpdir
+fi
+mkdir -p $tmpdir
+cd $tmpdir
+
+# Download the kafka binaries.
+kafka="kafka_2.11-2.0.0"
+wget -nv http://deps.memgraph.io/$kafka.tgz
+tar -xf $kafka.tgz
+mv $kafka kafka
+
+# Find memgraph binaries.
+binary_dir="$DIR/../../../build"
+if [ ! -d $binary_dir ]; then
+    binary_dir="$DIR/../../../build_debug"
+fi
+
+# Cleanup old kafka logs.
+if [ -d /tmp/kafka-logs ]; then
+    rm -rf /tmp/kafka-logs
+fi
+if [ -d /tmp/zookeeper ]; then
+    rm -rf /tmp/zookeeper
+fi
+
+
+## Startup
+
+# Start the zookeeper process and wait for it to start.
+echo_info "Starting zookeeper"
+./kafka/bin/zookeeper-server-start.sh -daemon kafka/config/zookeeper.properties
+wait_for_server 2181
+echo_success "Started zookeeper"
+
+# Start the kafka process and wait for it to start.
+echo_info "Starting kafka"
+./kafka/bin/kafka-server-start.sh -daemon kafka/config/server.properties
+wait_for_server 9092
+echo_success "Started kafka"
+
+# Start the memgraph process and wait for it to start.
+echo_info "Starting memgraph"
+$binary_dir/memgraph &
+pid=$!
+wait_for_server 7687
+echo_success "Started memgraph"
+
+
+## Run the test
+
+# Create the kafka topic.
+echo_info "Creating kafka topic"
+./kafka/bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test
+echo_success "Created kafka topic"
+
+# Start a http server to serve the transform script.
+echo_info "Starting Python HTTP server"
+mkdir serve
+cd serve
+cp "$DIR/transform.py" transform.py
+python3 -m http.server &
+wait_for_server 8000
+http_pid=$!
+cd ..
+echo_success "Started Python HTTP server"
+
+# Create and start the stream in memgraph.
+echo_info "Defining and starting the stream in memgraph"
+$binary_dir/tests/integration/kafka/tester --step start
+code1=$?
+if [ $code1 -eq 0 ]; then
+    echo_success "Defined and started the stream in memgraph"
+else
+    echo_failure "Couldn't define and/or start the stream in memgraph"
+fi
+
+# Wait for the streams to start up.
+sleep 10
+
+# Produce some messages.
+echo_info "Producing kafka messages"
+./kafka/bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test <<EOF
+1
+2
+3
+4
+1 2
+3 4
+1 4
+EOF
+echo_success "Produced kafka messages"
+
+# Wait for the messages to be consumed.
+sleep 10
+
+# Verify that the received graph is good.
+echo_info "Checking the received graph in memgraph"
+$binary_dir/tests/integration/kafka/tester --step verify
+code2=$?
+if [ $code2 -eq 0 ]; then
+    echo_success "Checked the received graph in memgraph"
+else
+    echo_failure "Couldn't check the received graph in memgraph"
+fi
+
+
+## Cleanup
+
+echo_info "Starting test cleanup"
+
+# Shutdown the http server.
+kill $http_pid
+wait -n
+
+# Shutdown the memgraph process.
+kill $pid
+wait -n
+code_mg=$?
+
+# Shutdown the kafka process.
+./kafka/bin/kafka-server-stop.sh
+
+# Shutdown the zookeeper process.
+./kafka/bin/zookeeper-server-stop.sh
+
+echo_success "Test cleanup done"
+
+# Check memgraph exit code.
+if [ $code_mg -ne 0 ]; then
+    echo "The memgraph process didn't terminate properly!"
+    exit $code_mg
+fi
+
+# Exit with the exitcode of the test.
+[ $code1 -ne 0 ] && exit $code1
+[ $code2 -ne 0 ] && exit $code2
+exit 0
diff --git a/tests/integration/kafka/tester.cpp b/tests/integration/kafka/tester.cpp
new file mode 100644
index 000000000..70f6f95b9
--- /dev/null
+++ b/tests/integration/kafka/tester.cpp
@@ -0,0 +1,84 @@
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+
+#include "communication/bolt/client.hpp"
+#include "io/network/endpoint.hpp"
+#include "io/network/utils.hpp"
+#include "utils/timer.hpp"
+
+DEFINE_string(address, "127.0.0.1", "Server address");
+DEFINE_int32(port, 7687, "Server port");
+DEFINE_string(username, "", "Username for the database");
+DEFINE_string(password, "", "Password for the database");
+DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server.");
+
+DEFINE_string(step, "", "Step that should be executed on the database.");
+
+void ExecuteQuery(communication::bolt::Client &client,
+                  const std::string &query) {
+  try {
+    client.Execute(query, {});
+  } catch (const communication::bolt::ClientQueryException &e) {
+    LOG(FATAL) << "Couldn't execute query '" << query
+               << "'! Received exception: " << e.what();
+  }
+}
+
+void ExecuteQueryAndCheck(communication::bolt::Client &client,
+                          const std::string &query, int64_t value) {
+  try {
+    auto resp = client.Execute(query, {});
+    if (resp.records.size() == 0 || resp.records[0].size() == 0) {
+      LOG(FATAL) << "The query '" << query << "' didn't return records!";
+    }
+    if (resp.records[0][0].ValueInt() != value) {
+      LOG(FATAL) << "The query '" << query << "' was expected to return "
+                 << value << " but it returned "
+                 << resp.records[0][0].ValueInt() << "!";
+    }
+  } catch (const communication::bolt::ClientQueryException &e) {
+    LOG(FATAL) << "Couldn't execute query '" << query
+               << "'! Received exception: " << e.what();
+  }
+}
+
+int main(int argc, char **argv) {
+  gflags::ParseCommandLineFlags(&argc, &argv, true);
+  google::InitGoogleLogging(argv[0]);
+
+  communication::Init();
+
+  io::network::Endpoint endpoint(io::network::ResolveHostname(FLAGS_address),
+                                 FLAGS_port);
+
+  communication::ClientContext context(FLAGS_use_ssl);
+  communication::bolt::Client client(&context);
+
+  if (!client.Connect(endpoint, FLAGS_username, FLAGS_password)) {
+    LOG(FATAL) << "Couldn't connect to server " << FLAGS_address << ":"
+               << FLAGS_port;
+  }
+
+  if (FLAGS_step == "start") {
+    ExecuteQuery(client,
+                 "CREATE STREAM strim AS LOAD DATA KAFKA '127.0.0.1:9092' WITH "
+                 "TOPIC 'test' WITH TRANSFORM "
+                 "'http://127.0.0.1:8000/transform.py'");
+    ExecuteQuery(client, "START STREAM strim");
+  } else if (FLAGS_step == "verify") {
+    ExecuteQueryAndCheck(client,
+                         "UNWIND RANGE(1, 4) AS x MATCH (n:node {num: "
+                         "toString(x)}) RETURN count(n)",
+                         4);
+    ExecuteQueryAndCheck(client,
+                         "UNWIND [[1, 2], [3, 4], [1, 4]] AS x MATCH (n:node "
+                         "{num: toString(x[0])})-[e:et]-(m:node {num: "
+                         "toString(x[1])}) RETURN count(e)",
+                         3);
+
+  } else {
+    LOG(FATAL) << "Unknown step " << FLAGS_step << "!";
+  }
+
+  return 0;
+}
diff --git a/tests/integration/kafka/transform.py b/tests/integration/kafka/transform.py
new file mode 100644
index 000000000..1fd87ca9b
--- /dev/null
+++ b/tests/integration/kafka/transform.py
@@ -0,0 +1,15 @@
+index_done = False
+
+def stream(batch):
+    global index_done
+    ret = []
+    if not index_done:
+        ret.append(("CREATE INDEX ON :node(num)", {}))
+        index_done = True
+    for item in batch:
+        message = item.decode("utf-8").strip().split()
+        if len(message) == 1:
+            ret.append(("CREATE (:node {num: $num})", {"num": message[0]}))
+        elif len(message) == 2:
+            ret.append(("MATCH (n:node {num: $num1}), (m:node {num: $num2}) CREATE (n)-[:et]->(m)", {"num1": message[0], "num2": message[1]}))
+    return ret