From fa7e214bcf7e1ae84b75e7ced145663deadd2545 Mon Sep 17 00:00:00 2001 From: Matija Santl Date: Tue, 19 Jun 2018 14:37:02 +0200 Subject: [PATCH] Add kafka library and integrate it into memgraph Summary: Integrated kafka library into memgraph. This version supports all opencypher features and will only output messages consumed from kafka. Depends on D1434 Next steps are persisting stream metadata and transforming messages in order to store them in the graph. Reviewers: teon.banek, mtomic, mferencevic, buda Reviewed By: teon.banek Subscribers: mferencevic, pullbot, buda Differential Revision: https://phabricator.memgraph.io/D1466 --- .ycm_extra_conf.py | 1 + CMakeLists.txt | 1 + docs/feature_specs/kafka/opencypher.md | 16 +- libs/CMakeLists.txt | 16 ++ libs/setup.sh | 5 + src/CMakeLists.txt | 7 +- src/database/graph_db.cpp | 7 + src/database/graph_db.hpp | 3 + src/integrations/CMakeLists.txt | 7 + src/integrations/kafka/consumer.cpp | 202 ++++++++++++++++++ src/integrations/kafka/consumer.hpp | 66 ++++++ src/integrations/kafka/exceptions.hpp | 59 +++++ src/integrations/kafka/streams.cpp | 68 ++++++ src/integrations/kafka/streams.hpp | 38 ++++ src/query/frontend/ast/ast.capnp | 6 +- src/query/frontend/ast/ast.cpp | 34 ++- src/query/frontend/ast/ast.hpp | 33 +-- .../frontend/ast/cypher_main_visitor.cpp | 33 ++- .../frontend/ast/cypher_main_visitor.hpp | 3 + .../frontend/opencypher/grammar/Cypher.g4 | 18 +- .../frontend/stripped_lexer_constants.hpp | 2 +- src/query/plan/operator.cpp | 150 +++++++++++-- src/query/plan/operator.lcp | 35 ++- src/query/plan/rule_based_planner.hpp | 12 +- tests/unit/CMakeLists.txt | 4 +- tests/unit/cypher_main_visitor.cpp | 94 ++++++-- tests/unit/query_common.hpp | 8 +- tests/unit/query_planner.cpp | 105 ++++++--- 28 files changed, 923 insertions(+), 110 deletions(-) create mode 100644 src/integrations/CMakeLists.txt create mode 100644 src/integrations/kafka/consumer.cpp create mode 100644 src/integrations/kafka/consumer.hpp create mode 100644 src/integrations/kafka/exceptions.hpp create mode 100644 src/integrations/kafka/streams.cpp create mode 100644 src/integrations/kafka/streams.hpp diff --git a/.ycm_extra_conf.py b/.ycm_extra_conf.py index 99b8f46ff..0e172c1b6 100644 --- a/.ycm_extra_conf.py +++ b/.ycm_extra_conf.py @@ -35,6 +35,7 @@ BASE_FLAGS = [ '-I./libs/bzip2', '-I./libs/zlib', '-I./libs/rocksdb/include', + '-I./libs/librdkafka/include/librdkafka', '-I./build/include' ] diff --git a/CMakeLists.txt b/CMakeLists.txt index b8db6c080..010ab59ea 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -154,6 +154,7 @@ include_directories(SYSTEM ${BZIP2_INCLUDE_DIR}) include_directories(SYSTEM ${ZLIB_INCLUDE_DIR}) include_directories(SYSTEM ${ROCKSDB_INCLUDE_DIR}) include_directories(SYSTEM ${CAPNP_INCLUDE_DIR}) +include_directories(SYSTEM ${LIBRDKAFKA_INCLUDE_DIR}) # ----------------------------------------------------------------------------- # openCypher parser ----------------------------------------------------------- diff --git a/docs/feature_specs/kafka/opencypher.md b/docs/feature_specs/kafka/opencypher.md index 22f9dd2a1..7e485f17c 100644 --- a/docs/feature_specs/kafka/opencypher.md +++ b/docs/feature_specs/kafka/opencypher.md @@ -20,15 +20,29 @@ The full openCypher clause for creating a stream is: ```opencypher CREATE STREAM stream_name AS LOAD DATA KAFKA 'URI' + WITH TOPIC 'topic' WITH TRANSFORM 'URI' - [BATCH INTERVAL milliseconds] + [BATCH_INTERVAL milliseconds] + [BATCH_SIZE count] ``` +The `WITH TOPIC` parameter specifies the kafka topic from which we'll stream +data. + The `WITH TRANSFORM` parameter should contain a URI of the transform script. The `BATCH_INTERVAL` parameter defines the time interval in milliseconds that defines the time between two successive stream importing operations. +The `BATCH_SIZE` parameter defines the count of kafka messages that will be +batched together before import. + +If both `BATCH_INTERVAL` and `BATCH_SIZE` parameters are given, the condition +that is satisfied first will trigger the batched import. + +Default values for `BATCH_INTERVAL` is 100 milliseconds, and the default value +for `BATCH_SIZE` is 10; + The `DROP` clause deletes a stream: ```opencypher DROP STREAM stream_name; diff --git a/libs/CMakeLists.txt b/libs/CMakeLists.txt index e17c04c91..167fccf92 100644 --- a/libs/CMakeLists.txt +++ b/libs/CMakeLists.txt @@ -233,3 +233,19 @@ set(CAPNP_EXE ${CMAKE_CURRENT_SOURCE_DIR}/capnproto/local/bin/capnp set(CAPNP_CXX_EXE ${CMAKE_CURRENT_SOURCE_DIR}/capnproto/local/bin/capnpc-c++ CACHE FILEPATH "Path to capnproto c++ plugin executable" FORCE) mark_as_advanced(CAPNP_INCLUDE_DIR CAPNP_LIBRARY KJ_LIBRARY CAPNP_EXE CAPNP_CXX_EXE) + +# Setup librdkafka. +import_external_library(librdkafka STATIC + ${CMAKE_CURRENT_SOURCE_DIR}/librdkafka/lib/librdkafka.a + ${CMAKE_CURRENT_SOURCE_DIR}/librdkafka/include/librdkafka + CMAKE_ARGS -DRDKAFKA_BUILD_STATIC=ON + -DRDKAFKA_BUILD_EXAMPLES=OFF + -DRDKAFKA_BUILD_TESTS=OFF + -DCMAKE_INSTALL_LIBDIR=lib + -DWITH_SSL=ON + # If we want SASL, we need to install it on build machines + -DWITH_SASL=OFF) + +import_library(librdkafka++ STATIC + ${CMAKE_CURRENT_SOURCE_DIR}/librdkafka/lib/librdkafka++.a + librdkafka-proj) diff --git a/libs/setup.sh b/libs/setup.sh index 4c72e246a..14c9e5ae0 100755 --- a/libs/setup.sh +++ b/libs/setup.sh @@ -127,3 +127,8 @@ tar -xzf capnproto.tar.gz rm -rf capnproto mv capnproto-c++-0.6.1 capnproto rm capnproto.tar.gz + +# kafka +kafka_tag="c319b4e987d0bc4fe4f01cf91419d90b62061655" # Mar 8, 2018 +# git clone https://github.com/edenhill/librdkafka.git +clone git://deps.memgraph.io/librdkafka.git librdkafka $kafka_tag diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 54c51ab23..486725bcd 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,7 +1,8 @@ # CMake configuration for the main memgraph library and executable -# add memgraph sub libraries +# add memgraph sub libraries, ordered by dependency add_subdirectory(utils) +add_subdirectory(integrations) add_subdirectory(io) add_subdirectory(telemetry) @@ -196,7 +197,7 @@ set(MEMGRAPH_ALL_LIBS stdc++fs Threads::Threads fmt cppitertools ${OPENSSL_LIBRARIES} ${Boost_IOSTREAMS_LIBRARY_RELEASE} ${Boost_SERIALIZATION_LIBRARY_RELEASE} - mg-utils mg-io) + mg-utils mg-io mg-integrations) if (USE_LTALLOC) list(APPEND MEMGRAPH_ALL_LIBS ltalloc) @@ -218,7 +219,7 @@ add_dependencies(memgraph_lib generate_capnp) # STATIC library used to store key-value pairs add_library(kvstore_lib STATIC storage/kvstore.cpp) -target_link_libraries(kvstore_lib stdc++fs mg-utils rocksdb bzip2 zlib) +target_link_libraries(kvstore_lib stdc++fs mg-utils rocksdb bzip2 zlib glog gflags) # STATIC library for dummy key-value storage add_library(kvstore_dummy_lib STATIC storage/kvstore_dummy.cpp) diff --git a/src/database/graph_db.cpp b/src/database/graph_db.cpp index bb5dce37f..ddf229b18 100644 --- a/src/database/graph_db.cpp +++ b/src/database/graph_db.cpp @@ -56,6 +56,9 @@ class PrivateBase : public GraphDb { Storage &storage() override { return *storage_; } durability::WriteAheadLog &wal() override { return wal_; } + integrations::kafka::Streams &kafka_streams() override { + return kafka_streams_; + } int WorkerId() const override { return config_.worker_id; } // Makes a local snapshot from the visibility of accessor @@ -98,6 +101,7 @@ class PrivateBase : public GraphDb { durability::WriteAheadLog wal_{config_.worker_id, config_.durability_directory, config_.durability_enabled}; + integrations::kafka::Streams kafka_streams_; }; template