diff --git a/.ycm_extra_conf.py b/.ycm_extra_conf.py index 99b8f46ff..0e172c1b6 100644 --- a/.ycm_extra_conf.py +++ b/.ycm_extra_conf.py @@ -35,6 +35,7 @@ BASE_FLAGS = [ '-I./libs/bzip2', '-I./libs/zlib', '-I./libs/rocksdb/include', + '-I./libs/librdkafka/include/librdkafka', '-I./build/include' ] diff --git a/CMakeLists.txt b/CMakeLists.txt index b8db6c080..010ab59ea 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -154,6 +154,7 @@ include_directories(SYSTEM ${BZIP2_INCLUDE_DIR}) include_directories(SYSTEM ${ZLIB_INCLUDE_DIR}) include_directories(SYSTEM ${ROCKSDB_INCLUDE_DIR}) include_directories(SYSTEM ${CAPNP_INCLUDE_DIR}) +include_directories(SYSTEM ${LIBRDKAFKA_INCLUDE_DIR}) # ----------------------------------------------------------------------------- # openCypher parser ----------------------------------------------------------- diff --git a/docs/feature_specs/kafka/opencypher.md b/docs/feature_specs/kafka/opencypher.md index 22f9dd2a1..7e485f17c 100644 --- a/docs/feature_specs/kafka/opencypher.md +++ b/docs/feature_specs/kafka/opencypher.md @@ -20,15 +20,29 @@ The full openCypher clause for creating a stream is: ```opencypher CREATE STREAM stream_name AS LOAD DATA KAFKA 'URI' + WITH TOPIC 'topic' WITH TRANSFORM 'URI' - [BATCH INTERVAL milliseconds] + [BATCH_INTERVAL milliseconds] + [BATCH_SIZE count] ``` +The `WITH TOPIC` parameter specifies the kafka topic from which we'll stream +data. + The `WITH TRANSFORM` parameter should contain a URI of the transform script. The `BATCH_INTERVAL` parameter defines the time interval in milliseconds that defines the time between two successive stream importing operations. +The `BATCH_SIZE` parameter defines the count of kafka messages that will be +batched together before import. + +If both `BATCH_INTERVAL` and `BATCH_SIZE` parameters are given, the condition +that is satisfied first will trigger the batched import. + +Default values for `BATCH_INTERVAL` is 100 milliseconds, and the default value +for `BATCH_SIZE` is 10; + The `DROP` clause deletes a stream: ```opencypher DROP STREAM stream_name; diff --git a/libs/CMakeLists.txt b/libs/CMakeLists.txt index e17c04c91..167fccf92 100644 --- a/libs/CMakeLists.txt +++ b/libs/CMakeLists.txt @@ -233,3 +233,19 @@ set(CAPNP_EXE ${CMAKE_CURRENT_SOURCE_DIR}/capnproto/local/bin/capnp set(CAPNP_CXX_EXE ${CMAKE_CURRENT_SOURCE_DIR}/capnproto/local/bin/capnpc-c++ CACHE FILEPATH "Path to capnproto c++ plugin executable" FORCE) mark_as_advanced(CAPNP_INCLUDE_DIR CAPNP_LIBRARY KJ_LIBRARY CAPNP_EXE CAPNP_CXX_EXE) + +# Setup librdkafka. +import_external_library(librdkafka STATIC + ${CMAKE_CURRENT_SOURCE_DIR}/librdkafka/lib/librdkafka.a + ${CMAKE_CURRENT_SOURCE_DIR}/librdkafka/include/librdkafka + CMAKE_ARGS -DRDKAFKA_BUILD_STATIC=ON + -DRDKAFKA_BUILD_EXAMPLES=OFF + -DRDKAFKA_BUILD_TESTS=OFF + -DCMAKE_INSTALL_LIBDIR=lib + -DWITH_SSL=ON + # If we want SASL, we need to install it on build machines + -DWITH_SASL=OFF) + +import_library(librdkafka++ STATIC + ${CMAKE_CURRENT_SOURCE_DIR}/librdkafka/lib/librdkafka++.a + librdkafka-proj) diff --git a/libs/setup.sh b/libs/setup.sh index 4c72e246a..14c9e5ae0 100755 --- a/libs/setup.sh +++ b/libs/setup.sh @@ -127,3 +127,8 @@ tar -xzf capnproto.tar.gz rm -rf capnproto mv capnproto-c++-0.6.1 capnproto rm capnproto.tar.gz + +# kafka +kafka_tag="c319b4e987d0bc4fe4f01cf91419d90b62061655" # Mar 8, 2018 +# git clone https://github.com/edenhill/librdkafka.git +clone git://deps.memgraph.io/librdkafka.git librdkafka $kafka_tag diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 54c51ab23..486725bcd 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,7 +1,8 @@ # CMake configuration for the main memgraph library and executable -# add memgraph sub libraries +# add memgraph sub libraries, ordered by dependency add_subdirectory(utils) +add_subdirectory(integrations) add_subdirectory(io) add_subdirectory(telemetry) @@ -196,7 +197,7 @@ set(MEMGRAPH_ALL_LIBS stdc++fs Threads::Threads fmt cppitertools ${OPENSSL_LIBRARIES} ${Boost_IOSTREAMS_LIBRARY_RELEASE} ${Boost_SERIALIZATION_LIBRARY_RELEASE} - mg-utils mg-io) + mg-utils mg-io mg-integrations) if (USE_LTALLOC) list(APPEND MEMGRAPH_ALL_LIBS ltalloc) @@ -218,7 +219,7 @@ add_dependencies(memgraph_lib generate_capnp) # STATIC library used to store key-value pairs add_library(kvstore_lib STATIC storage/kvstore.cpp) -target_link_libraries(kvstore_lib stdc++fs mg-utils rocksdb bzip2 zlib) +target_link_libraries(kvstore_lib stdc++fs mg-utils rocksdb bzip2 zlib glog gflags) # STATIC library for dummy key-value storage add_library(kvstore_dummy_lib STATIC storage/kvstore_dummy.cpp) diff --git a/src/database/graph_db.cpp b/src/database/graph_db.cpp index bb5dce37f..ddf229b18 100644 --- a/src/database/graph_db.cpp +++ b/src/database/graph_db.cpp @@ -56,6 +56,9 @@ class PrivateBase : public GraphDb { Storage &storage() override { return *storage_; } durability::WriteAheadLog &wal() override { return wal_; } + integrations::kafka::Streams &kafka_streams() override { + return kafka_streams_; + } int WorkerId() const override { return config_.worker_id; } // Makes a local snapshot from the visibility of accessor @@ -98,6 +101,7 @@ class PrivateBase : public GraphDb { durability::WriteAheadLog wal_{config_.worker_id, config_.durability_directory, config_.durability_enabled}; + integrations::kafka::Streams kafka_streams_; }; template