Implement kafka integration test

Reviewers: msantl

Reviewed By: msantl

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1538
This commit is contained in:
Matej Ferencevic 2018-08-14 11:11:30 +02:00
parent 158f97206d
commit 1febc15d68
7 changed files with 285 additions and 0 deletions

View File

@ -134,6 +134,9 @@ void Consumer::StartConsuming(
if (batch.empty()) continue;
DLOG(INFO) << "[Kafka] stream " << info_.stream_name
<< " processing a batch";
// All exceptions that could be possibly thrown by the `Apply` function
// must be handled here because they *will* crash the database if
// uncaught!

View File

@ -6,3 +6,6 @@ add_subdirectory(ssl)
# transactions test binaries
add_subdirectory(transactions)
# kafka test binaries
add_subdirectory(kafka)

View File

@ -22,3 +22,14 @@
- runner.sh # runner script
- ../../../build_debug/memgraph # memgraph binary
- ../../../build_debug/tests/integration/transactions/tester # tester binary
- name: integration__kafka
cd: kafka
commands: ./runner.sh
infiles:
- runner.sh # runner script
- transform.py # transform script
- ../../../build_debug/memgraph # memgraph binary
- ../../../build_debug/kafka.py # kafka script
- ../../../build_debug/tests/integration/kafka/tester # tester binary
enable_network: true

View File

@ -0,0 +1,6 @@
set(target_name memgraph__integration__kafka)
set(tester_target_name ${target_name}__tester)
add_executable(${tester_target_name} tester.cpp)
set_target_properties(${tester_target_name} PROPERTIES OUTPUT_NAME tester)
target_link_libraries(${tester_target_name} mg-communication)

163
tests/integration/kafka/runner.sh Executable file
View File

@ -0,0 +1,163 @@
#!/bin/bash
## Helper functions
function wait_for_server {
port=$1
while ! nc -z -w 1 127.0.0.1 $port; do
sleep 0.1
done
sleep 1
}
function echo_info { printf "\033[1;36m~~ $1 ~~\033[0m\n"; }
function echo_success { printf "\033[1;32m~~ $1 ~~\033[0m\n\n"; }
function echo_failure { printf "\033[1;31m~~ $1 ~~\033[0m\n\n"; }
## Environment setup
# Get script location.
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
cd "$DIR"
# Create a temporary directory.
tmpdir=/tmp/memgraph_integration_kafka
if [ -d $tmpdir ]; then
rm -rf $tmpdir
fi
mkdir -p $tmpdir
cd $tmpdir
# Download the kafka binaries.
kafka="kafka_2.11-2.0.0"
wget -nv http://deps.memgraph.io/$kafka.tgz
tar -xf $kafka.tgz
mv $kafka kafka
# Find memgraph binaries.
binary_dir="$DIR/../../../build"
if [ ! -d $binary_dir ]; then
binary_dir="$DIR/../../../build_debug"
fi
# Cleanup old kafka logs.
if [ -d /tmp/kafka-logs ]; then
rm -rf /tmp/kafka-logs
fi
if [ -d /tmp/zookeeper ]; then
rm -rf /tmp/zookeeper
fi
## Startup
# Start the zookeeper process and wait for it to start.
echo_info "Starting zookeeper"
./kafka/bin/zookeeper-server-start.sh -daemon kafka/config/zookeeper.properties
wait_for_server 2181
echo_success "Started zookeeper"
# Start the kafka process and wait for it to start.
echo_info "Starting kafka"
./kafka/bin/kafka-server-start.sh -daemon kafka/config/server.properties
wait_for_server 9092
echo_success "Started kafka"
# Start the memgraph process and wait for it to start.
echo_info "Starting memgraph"
$binary_dir/memgraph &
pid=$!
wait_for_server 7687
echo_success "Started memgraph"
## Run the test
# Create the kafka topic.
echo_info "Creating kafka topic"
./kafka/bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test
echo_success "Created kafka topic"
# Start a http server to serve the transform script.
echo_info "Starting Python HTTP server"
mkdir serve
cd serve
cp "$DIR/transform.py" transform.py
python3 -m http.server &
wait_for_server 8000
http_pid=$!
cd ..
echo_success "Started Python HTTP server"
# Create and start the stream in memgraph.
echo_info "Defining and starting the stream in memgraph"
$binary_dir/tests/integration/kafka/tester --step start
code1=$?
if [ $code1 -eq 0 ]; then
echo_success "Defined and started the stream in memgraph"
else
echo_failure "Couldn't define and/or start the stream in memgraph"
fi
# Wait for the streams to start up.
sleep 10
# Produce some messages.
echo_info "Producing kafka messages"
./kafka/bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test <<EOF
1
2
3
4
1 2
3 4
1 4
EOF
echo_success "Produced kafka messages"
# Wait for the messages to be consumed.
sleep 10
# Verify that the received graph is good.
echo_info "Checking the received graph in memgraph"
$binary_dir/tests/integration/kafka/tester --step verify
code2=$?
if [ $code2 -eq 0 ]; then
echo_success "Checked the received graph in memgraph"
else
echo_failure "Couldn't check the received graph in memgraph"
fi
## Cleanup
echo_info "Starting test cleanup"
# Shutdown the http server.
kill $http_pid
wait -n
# Shutdown the memgraph process.
kill $pid
wait -n
code_mg=$?
# Shutdown the kafka process.
./kafka/bin/kafka-server-stop.sh
# Shutdown the zookeeper process.
./kafka/bin/zookeeper-server-stop.sh
echo_success "Test cleanup done"
# Check memgraph exit code.
if [ $code_mg -ne 0 ]; then
echo "The memgraph process didn't terminate properly!"
exit $code_mg
fi
# Exit with the exitcode of the test.
[ $code1 -ne 0 ] && exit $code1
[ $code2 -ne 0 ] && exit $code2
exit 0

View File

@ -0,0 +1,84 @@
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "communication/bolt/client.hpp"
#include "io/network/endpoint.hpp"
#include "io/network/utils.hpp"
#include "utils/timer.hpp"
DEFINE_string(address, "127.0.0.1", "Server address");
DEFINE_int32(port, 7687, "Server port");
DEFINE_string(username, "", "Username for the database");
DEFINE_string(password, "", "Password for the database");
DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server.");
DEFINE_string(step, "", "Step that should be executed on the database.");
void ExecuteQuery(communication::bolt::Client &client,
const std::string &query) {
try {
client.Execute(query, {});
} catch (const communication::bolt::ClientQueryException &e) {
LOG(FATAL) << "Couldn't execute query '" << query
<< "'! Received exception: " << e.what();
}
}
void ExecuteQueryAndCheck(communication::bolt::Client &client,
const std::string &query, int64_t value) {
try {
auto resp = client.Execute(query, {});
if (resp.records.size() == 0 || resp.records[0].size() == 0) {
LOG(FATAL) << "The query '" << query << "' didn't return records!";
}
if (resp.records[0][0].ValueInt() != value) {
LOG(FATAL) << "The query '" << query << "' was expected to return "
<< value << " but it returned "
<< resp.records[0][0].ValueInt() << "!";
}
} catch (const communication::bolt::ClientQueryException &e) {
LOG(FATAL) << "Couldn't execute query '" << query
<< "'! Received exception: " << e.what();
}
}
int main(int argc, char **argv) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
google::InitGoogleLogging(argv[0]);
communication::Init();
io::network::Endpoint endpoint(io::network::ResolveHostname(FLAGS_address),
FLAGS_port);
communication::ClientContext context(FLAGS_use_ssl);
communication::bolt::Client client(&context);
if (!client.Connect(endpoint, FLAGS_username, FLAGS_password)) {
LOG(FATAL) << "Couldn't connect to server " << FLAGS_address << ":"
<< FLAGS_port;
}
if (FLAGS_step == "start") {
ExecuteQuery(client,
"CREATE STREAM strim AS LOAD DATA KAFKA '127.0.0.1:9092' WITH "
"TOPIC 'test' WITH TRANSFORM "
"'http://127.0.0.1:8000/transform.py'");
ExecuteQuery(client, "START STREAM strim");
} else if (FLAGS_step == "verify") {
ExecuteQueryAndCheck(client,
"UNWIND RANGE(1, 4) AS x MATCH (n:node {num: "
"toString(x)}) RETURN count(n)",
4);
ExecuteQueryAndCheck(client,
"UNWIND [[1, 2], [3, 4], [1, 4]] AS x MATCH (n:node "
"{num: toString(x[0])})-[e:et]-(m:node {num: "
"toString(x[1])}) RETURN count(e)",
3);
} else {
LOG(FATAL) << "Unknown step " << FLAGS_step << "!";
}
return 0;
}

View File

@ -0,0 +1,15 @@
index_done = False
def stream(batch):
global index_done
ret = []
if not index_done:
ret.append(("CREATE INDEX ON :node(num)", {}))
index_done = True
for item in batch:
message = item.decode("utf-8").strip().split()
if len(message) == 1:
ret.append(("CREATE (:node {num: $num})", {"num": message[0]}))
elif len(message) == 2:
ret.append(("MATCH (n:node {num: $num1}), (m:node {num: $num2}) CREATE (n)-[:et]->(m)", {"num1": message[0], "num2": message[1]}))
return ret