diff --git a/src/integrations/kafka/consumer.cpp b/src/integrations/kafka/consumer.cpp index a9301924f..f357fa46c 100644 --- a/src/integrations/kafka/consumer.cpp +++ b/src/integrations/kafka/consumer.cpp @@ -134,6 +134,9 @@ void Consumer::StartConsuming( if (batch.empty()) continue; + DLOG(INFO) << "[Kafka] stream " << info_.stream_name + << " processing a batch"; + // All exceptions that could be possibly thrown by the `Apply` function // must be handled here because they *will* crash the database if // uncaught! diff --git a/tests/integration/CMakeLists.txt b/tests/integration/CMakeLists.txt index 1f7abaf31..c4d559ea4 100644 --- a/tests/integration/CMakeLists.txt +++ b/tests/integration/CMakeLists.txt @@ -6,3 +6,6 @@ add_subdirectory(ssl) # transactions test binaries add_subdirectory(transactions) + +# kafka test binaries +add_subdirectory(kafka) diff --git a/tests/integration/apollo_runs.yaml b/tests/integration/apollo_runs.yaml index 213519bba..b91328a2a 100644 --- a/tests/integration/apollo_runs.yaml +++ b/tests/integration/apollo_runs.yaml @@ -22,3 +22,14 @@ - runner.sh # runner script - ../../../build_debug/memgraph # memgraph binary - ../../../build_debug/tests/integration/transactions/tester # tester binary + +- name: integration__kafka + cd: kafka + commands: ./runner.sh + infiles: + - runner.sh # runner script + - transform.py # transform script + - ../../../build_debug/memgraph # memgraph binary + - ../../../build_debug/kafka.py # kafka script + - ../../../build_debug/tests/integration/kafka/tester # tester binary + enable_network: true diff --git a/tests/integration/kafka/CMakeLists.txt b/tests/integration/kafka/CMakeLists.txt new file mode 100644 index 000000000..74781a6a4 --- /dev/null +++ b/tests/integration/kafka/CMakeLists.txt @@ -0,0 +1,6 @@ +set(target_name memgraph__integration__kafka) +set(tester_target_name ${target_name}__tester) + +add_executable(${tester_target_name} tester.cpp) +set_target_properties(${tester_target_name} PROPERTIES OUTPUT_NAME tester) +target_link_libraries(${tester_target_name} mg-communication) diff --git a/tests/integration/kafka/runner.sh b/tests/integration/kafka/runner.sh new file mode 100755 index 000000000..da900ff8f --- /dev/null +++ b/tests/integration/kafka/runner.sh @@ -0,0 +1,163 @@ +#!/bin/bash + +## Helper functions + +function wait_for_server { + port=$1 + while ! nc -z -w 1 127.0.0.1 $port; do + sleep 0.1 + done + sleep 1 +} + +function echo_info { printf "\033[1;36m~~ $1 ~~\033[0m\n"; } +function echo_success { printf "\033[1;32m~~ $1 ~~\033[0m\n\n"; } +function echo_failure { printf "\033[1;31m~~ $1 ~~\033[0m\n\n"; } + + +## Environment setup + +# Get script location. +DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +cd "$DIR" + +# Create a temporary directory. +tmpdir=/tmp/memgraph_integration_kafka +if [ -d $tmpdir ]; then + rm -rf $tmpdir +fi +mkdir -p $tmpdir +cd $tmpdir + +# Download the kafka binaries. +kafka="kafka_2.11-2.0.0" +wget -nv http://deps.memgraph.io/$kafka.tgz +tar -xf $kafka.tgz +mv $kafka kafka + +# Find memgraph binaries. +binary_dir="$DIR/../../../build" +if [ ! -d $binary_dir ]; then + binary_dir="$DIR/../../../build_debug" +fi + +# Cleanup old kafka logs. +if [ -d /tmp/kafka-logs ]; then + rm -rf /tmp/kafka-logs +fi +if [ -d /tmp/zookeeper ]; then + rm -rf /tmp/zookeeper +fi + + +## Startup + +# Start the zookeeper process and wait for it to start. +echo_info "Starting zookeeper" +./kafka/bin/zookeeper-server-start.sh -daemon kafka/config/zookeeper.properties +wait_for_server 2181 +echo_success "Started zookeeper" + +# Start the kafka process and wait for it to start. +echo_info "Starting kafka" +./kafka/bin/kafka-server-start.sh -daemon kafka/config/server.properties +wait_for_server 9092 +echo_success "Started kafka" + +# Start the memgraph process and wait for it to start. +echo_info "Starting memgraph" +$binary_dir/memgraph & +pid=$! +wait_for_server 7687 +echo_success "Started memgraph" + + +## Run the test + +# Create the kafka topic. +echo_info "Creating kafka topic" +./kafka/bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test +echo_success "Created kafka topic" + +# Start a http server to serve the transform script. +echo_info "Starting Python HTTP server" +mkdir serve +cd serve +cp "$DIR/transform.py" transform.py +python3 -m http.server & +wait_for_server 8000 +http_pid=$! +cd .. +echo_success "Started Python HTTP server" + +# Create and start the stream in memgraph. +echo_info "Defining and starting the stream in memgraph" +$binary_dir/tests/integration/kafka/tester --step start +code1=$? +if [ $code1 -eq 0 ]; then + echo_success "Defined and started the stream in memgraph" +else + echo_failure "Couldn't define and/or start the stream in memgraph" +fi + +# Wait for the streams to start up. +sleep 10 + +# Produce some messages. +echo_info "Producing kafka messages" +./kafka/bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test < +#include + +#include "communication/bolt/client.hpp" +#include "io/network/endpoint.hpp" +#include "io/network/utils.hpp" +#include "utils/timer.hpp" + +DEFINE_string(address, "127.0.0.1", "Server address"); +DEFINE_int32(port, 7687, "Server port"); +DEFINE_string(username, "", "Username for the database"); +DEFINE_string(password, "", "Password for the database"); +DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server."); + +DEFINE_string(step, "", "Step that should be executed on the database."); + +void ExecuteQuery(communication::bolt::Client &client, + const std::string &query) { + try { + client.Execute(query, {}); + } catch (const communication::bolt::ClientQueryException &e) { + LOG(FATAL) << "Couldn't execute query '" << query + << "'! Received exception: " << e.what(); + } +} + +void ExecuteQueryAndCheck(communication::bolt::Client &client, + const std::string &query, int64_t value) { + try { + auto resp = client.Execute(query, {}); + if (resp.records.size() == 0 || resp.records[0].size() == 0) { + LOG(FATAL) << "The query '" << query << "' didn't return records!"; + } + if (resp.records[0][0].ValueInt() != value) { + LOG(FATAL) << "The query '" << query << "' was expected to return " + << value << " but it returned " + << resp.records[0][0].ValueInt() << "!"; + } + } catch (const communication::bolt::ClientQueryException &e) { + LOG(FATAL) << "Couldn't execute query '" << query + << "'! Received exception: " << e.what(); + } +} + +int main(int argc, char **argv) { + gflags::ParseCommandLineFlags(&argc, &argv, true); + google::InitGoogleLogging(argv[0]); + + communication::Init(); + + io::network::Endpoint endpoint(io::network::ResolveHostname(FLAGS_address), + FLAGS_port); + + communication::ClientContext context(FLAGS_use_ssl); + communication::bolt::Client client(&context); + + if (!client.Connect(endpoint, FLAGS_username, FLAGS_password)) { + LOG(FATAL) << "Couldn't connect to server " << FLAGS_address << ":" + << FLAGS_port; + } + + if (FLAGS_step == "start") { + ExecuteQuery(client, + "CREATE STREAM strim AS LOAD DATA KAFKA '127.0.0.1:9092' WITH " + "TOPIC 'test' WITH TRANSFORM " + "'http://127.0.0.1:8000/transform.py'"); + ExecuteQuery(client, "START STREAM strim"); + } else if (FLAGS_step == "verify") { + ExecuteQueryAndCheck(client, + "UNWIND RANGE(1, 4) AS x MATCH (n:node {num: " + "toString(x)}) RETURN count(n)", + 4); + ExecuteQueryAndCheck(client, + "UNWIND [[1, 2], [3, 4], [1, 4]] AS x MATCH (n:node " + "{num: toString(x[0])})-[e:et]-(m:node {num: " + "toString(x[1])}) RETURN count(e)", + 3); + + } else { + LOG(FATAL) << "Unknown step " << FLAGS_step << "!"; + } + + return 0; +} diff --git a/tests/integration/kafka/transform.py b/tests/integration/kafka/transform.py new file mode 100644 index 000000000..1fd87ca9b --- /dev/null +++ b/tests/integration/kafka/transform.py @@ -0,0 +1,15 @@ +index_done = False + +def stream(batch): + global index_done + ret = [] + if not index_done: + ret.append(("CREATE INDEX ON :node(num)", {})) + index_done = True + for item in batch: + message = item.decode("utf-8").strip().split() + if len(message) == 1: + ret.append(("CREATE (:node {num: $num})", {"num": message[0]})) + elif len(message) == 2: + ret.append(("MATCH (n:node {num: $num1}), (m:node {num: $num2}) CREATE (n)-[:et]->(m)", {"num1": message[0], "num2": message[1]})) + return ret