Introduce pulsar dependency (#286)

This commit is contained in:
Antonio Andelic 2021-11-11 12:17:05 +01:00 committed by Antonio Andelic
parent 0ebd52aac3
commit 0e4719018a
40 changed files with 2302 additions and 151 deletions

View File

@ -49,6 +49,7 @@ MEMGRAPH_BUILD_DEPS=(
doxygen graphviz # source documentation generators
which mono-complete dotnet-sdk-3.1 golang nodejs zip unzip java-11-openjdk-devel # for driver tests
autoconf # for jemalloc code generation
libtool # for protobuf code generation
)
list() {

View File

@ -48,6 +48,7 @@ MEMGRAPH_BUILD_DEPS=(
which mono-complete dotnet-sdk-3.1 nodejs golang zip unzip java-11-openjdk-devel # for driver tests
sbcl # for custom Lisp C++ preprocessing
autoconf # for jemalloc code generation
libtool # for protobuf code generation
)
list() {

View File

@ -46,6 +46,7 @@ MEMGRAPH_BUILD_DEPS=(
mono-runtime mono-mcs zip unzip default-jdk-headless # for driver tests
dotnet-sdk-3.1 golang nodejs npm
autoconf # for jemalloc code generation
libtool # for protobuf code generation
)
list() {

View File

@ -47,6 +47,7 @@ MEMGRAPH_BUILD_DEPS=(
mono-runtime mono-mcs zip unzip default-jdk-headless # for driver tests
dotnet-sdk-3.1 golang nodejs npm
autoconf # for jemalloc code generation
libtool # for protobuf code generation
)
list() {

View File

@ -44,6 +44,7 @@ MEMGRAPH_BUILD_DEPS=(
doxygen graphviz # source documentation generators
mono-runtime mono-mcs nodejs zip unzip default-jdk-headless # for driver tests
autoconf # for jemalloc code generation
libtool # for protobuf code generation
)
list() {

View File

@ -45,6 +45,7 @@ MEMGRAPH_BUILD_DEPS=(
doxygen graphviz # source documentation generators
mono-runtime mono-mcs nodejs zip unzip default-jdk-headless # driver tests
autoconf # for jemalloc code generation
libtool # for protobuf code generation
)
list() {

View File

@ -46,6 +46,7 @@ MEMGRAPH_BUILD_DEPS=(
mono-runtime mono-mcs zip unzip default-jdk-headless # for driver tests
dotnet-sdk-3.1 golang nodejs npm
autoconf # for jemalloc code generation
libtool # for protobuf code generation
)
list() {

1
libs/.gitignore vendored
View File

@ -5,3 +5,4 @@
!CMakeLists.txt
!__main.cpp
!jemalloc.cmake
!pulsar.patch

View File

@ -48,7 +48,7 @@ endfunction(import_library)
# INSTALL_COMMAND arguments.
function(add_external_project name)
set(options NO_C_COMPILER)
set(one_value_kwargs SOURCE_DIR)
set(one_value_kwargs SOURCE_DIR BUILD_IN_SOURCE)
set(multi_value_kwargs CMAKE_ARGS DEPENDS INSTALL_COMMAND BUILD_COMMAND
CONFIGURE_COMMAND)
cmake_parse_arguments(KW "${options}" "${one_value_kwargs}" "${multi_value_kwargs}" ${ARGN})
@ -56,11 +56,16 @@ function(add_external_project name)
if (KW_SOURCE_DIR)
set(source_dir ${KW_SOURCE_DIR})
endif()
set(build_in_source 0)
if (KW_BUILD_IN_SOURCE)
set(build_in_source ${KW_BUILD_IN_SOURCE})
endif()
if (NOT KW_NO_C_COMPILER)
set(KW_CMAKE_ARGS -DCMAKE_C_COMPILER=${CMAKE_C_COMPILER} ${KW_CMAKE_ARGS})
endif()
ExternalProject_Add(${name}-proj DEPENDS ${KW_DEPENDS}
PREFIX ${source_dir} SOURCE_DIR ${source_dir}
BUILD_IN_SOURCE ${build_in_source}
CONFIGURE_COMMAND ${KW_CONFIGURE_COMMAND}
CMAKE_ARGS -DCMAKE_BUILD_TYPE=Release
-DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER}
@ -169,9 +174,12 @@ import_external_library(bzip2 STATIC
INSTALL_COMMAND true)
# Setup zlib
set(ZLIB_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/zlib)
set(ZLIB_LIBRARIES ${ZLIB_ROOT}/lib/libz.a)
set(ZLIB_INCLUDE_DIRS ${ZLIB_ROOT}/include)
import_external_library(zlib STATIC
${CMAKE_CURRENT_SOURCE_DIR}/zlib/lib/libz.a
${CMAKE_CURRENT_SOURCE_DIR}/zlib
${ZLIB_LIBRARIES}
${ZLIB_INCLUDE_DIRS}
CMAKE_ARGS -DCMAKE_SKIP_INSTALL_ALL_DEPENDENCY=true
BUILD_COMMAND $(MAKE) zlibstatic)
@ -231,6 +239,7 @@ import_external_library(librdkafka STATIC
CMAKE_ARGS -DRDKAFKA_BUILD_STATIC=ON
-DRDKAFKA_BUILD_EXAMPLES=OFF
-DRDKAFKA_BUILD_TESTS=OFF
-DENABLE_LZ4_EXT=OFF
-DCMAKE_INSTALL_LIBDIR=lib
-DWITH_SSL=ON
# If we want SASL, we need to install it on build machines
@ -242,3 +251,36 @@ import_library(librdkafka++ STATIC
${CMAKE_CURRENT_SOURCE_DIR}/librdkafka/include
)
target_link_libraries(librdkafka++ INTERFACE librdkafka)
set(PROTOBUF_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/protobuf/lib)
import_external_library(protobuf STATIC
${PROTOBUF_ROOT}/lib/libprotobuf.a
${PROTOBUF_ROOT}/include
BUILD_IN_SOURCE 1
CONFIGURE_COMMAND true)
set(BOOST_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/boost/lib)
import_external_library(pulsar STATIC
${CMAKE_CURRENT_SOURCE_DIR}/pulsar/pulsar-client-cpp/lib/libpulsarwithdeps.a
${CMAKE_CURRENT_SOURCE_DIR}/pulsar/install/include
BUILD_IN_SOURCE 1
CONFIGURE_COMMAND cmake pulsar-client-cpp
-DCMAKE_INSTALL_PREFIX=${CMAKE_CURRENT_SOURCE_DIR}/pulsar/install
-DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER}
-DCMAKE_C_COMPILER=${CMAKE_C_COMPILER}
-DBUILD_DYNAMIC_LIB=OFF
-DBUILD_STATIC_LIB=ON
-DBUILD_TESTS=OFF
-DLINK_STATIC=ON
-DPROTOC_PATH=${PROTOBUF_ROOT}/bin/protoc
-DBOOST_ROOT=${BOOST_ROOT}
-DCMAKE_PREFIX_PATH=${PROTOBUF_ROOT}
-DProtobuf_INCLUDE_DIRS=${PROTOBUF_ROOT}/include
-DZLIB_LIBRARIES=${ZLIB_LIBRARIES}
-DZLIB_INCLUDE_DIRS=${ZLIB_INCLUDE_DIRS}
-DBUILD_PYTHON_WRAPPER=OFF
-DBUILD_PERF_TOOLS=OFF
-DUSE_LOG4CXX=OFF
BUILD_COMMAND $(MAKE) pulsarStaticWithDeps)
add_dependencies(pulsar-proj protobuf zlib)

1508
libs/pulsar.patch Normal file

File diff suppressed because it is too large Load Diff

View File

@ -14,39 +14,49 @@ clone () {
local git_repo=$1
local dir_name=$2
local checkout_id=$3
shift 3
local shallow=$4
shift 4
# Clone if there's no repo.
if [[ ! -d "$dir_name" ]]; then
echo "Cloning from $git_repo"
# If the clone fails, it doesn't make sense to continue with the function
# execution but the whole script should continue executing because we might
# clone the same repo from a different source.
git clone "$git_repo" "$dir_name" || return 1
if [ "$shallow" = true ]; then
git clone --depth 1 --branch "$checkout_id" "$git_repo" "$dir_name" || return 1
else
git clone "$git_repo" "$dir_name" || return 1
fi
fi
pushd "$dir_name"
# Just fetch new commits from remote repository. Don't merge/pull them in, so
# that we don't clobber local modifications.
git fetch
# Check whether we have any local changes which need to be preserved.
local local_changes=true
if git diff --no-ext-diff --quiet && git diff --no-ext-diff --cached --quiet; then
local_changes=false
fi
# Stash regardless of local_changes, so that a user gets a message on stdout.
git stash
# Checkout the primary commit (there's no need to pull/merge).
# The checkout fail should exit this script immediately because the target
# commit is not there and that will most likely create build-time errors.
git checkout "$checkout_id" || exit 1
# Apply any optional cherry pick fixes.
while [[ $# -ne 0 ]]; do
local cherry_pick_id=$1
shift
# The cherry-pick fail should exit this script immediately because the
# target commit is not there and that will most likely create build-time
# errors.
git cherry-pick -n "$cherry_pick_id" || exit 1
done
if [ "$shallow" = false ]; then
# Stash regardless of local_changes, so that a user gets a message on stdout.
git stash
# Just fetch new commits from remote repository. Don't merge/pull them in, so
# that we don't clobber local modifications.
git fetch
# Checkout the primary commit (there's no need to pull/merge).
# The checkout fail should exit this script immediately because the target
# commit is not there and that will most likely create build-time errors.
git checkout "$checkout_id" || exit 1
# Apply any optional cherry pick fixes.
while [[ $# -ne 0 ]]; do
local cherry_pick_id=$1
shift
# The cherry-pick fail should exit this script immediately because the
# target commit is not there and that will most likely create build-time
# errors.
git cherry-pick -n "$cherry_pick_id" || exit 1
done
fi
# Reapply any local changes.
if [[ $local_changes == true ]]; then
git stash pop
@ -70,12 +80,13 @@ repo_clone_try_double () {
secondary_url="$2"
folder_name="$3"
ref="$4"
shallow="${5:-false}"
echo "Cloning primary from $primary_url secondary from $secondary_url"
if [ -z "$primary_url" ]; then echo "Primary should not be empty." && exit 1; fi
if [ -z "$secondary_url" ]; then echo "Secondary should not be empty." && exit 1; fi
if [ -z "$folder_name" ]; then echo "Clone folder should not be empty." && exit 1; fi
if [ -z "$ref" ]; then echo "Git clone ref should not be empty." && exit 1; fi
clone "$primary_url" "$folder_name" "$ref" || clone "$secondary_url" "$folder_name" "$ref" || exit 1
clone "$primary_url" "$folder_name" "$ref" "$shallow" || clone "$secondary_url" "$folder_name" "$ref" "$shallow" || exit 1
echo ""
}
@ -113,6 +124,9 @@ declare -A primary_urls=(
["nlohmann"]="http://$local_cache_host/file/nlohmann/json/b3e5cb7f20dcc5c806e418df34324eca60d17d4e/single_include/nlohmann/json.hpp"
["neo4j"]="http://$local_cache_host/file/neo4j-community-3.2.3-unix.tar.gz"
["librdkafka"]="http://$local_cache_host/git/librdkafka.git"
["protobuf"]="http://$local_cache_host/git/protobuf.git"
["boost"]="https://boostorg.jfrog.io/artifactory/main/release/1.77.0/source/boost_1_77_0.tar.gz"
["pulsar"]="https://github.com/apache/pulsar.git"
)
# The goal of secondary urls is to have links to the "source of truth" of
@ -140,13 +154,16 @@ declare -A secondary_urls=(
["nlohmann"]="https://raw.githubusercontent.com/nlohmann/json/b3e5cb7f20dcc5c806e418df34324eca60d17d4e/single_include/nlohmann/json.hpp"
["neo4j"]="https://s3-eu-west-1.amazonaws.com/deps.memgraph.io/neo4j-community-3.2.3-unix.tar.gz"
["librdkafka"]="https://github.com/edenhill/librdkafka.git"
["protobuf"]="https://github.com/protocolbuffers/protobuf.git"
["boost"]="https://boostorg.jfrog.io/artifactory/main/release/1.77.0/source/boost_1_77_0.tar.gz"
["pulsar"]="https://github.com/apache/pulsar.git"
)
# antlr
file_get_try_double "${primary_urls[antlr4-generator]}" "${secondary_urls[antlr4-generator]}"
antlr4_tag="5e5b6d35b4183fd330102c40947b95c4b5c6abb5" # v4.9.2
repo_clone_try_double "${primary_urls[antlr4-code]}" "${secondary_urls[antlr4-code]}" "antlr4" "$antlr4_tag"
antlr4_tag="4.9.2" # v4.9.2
repo_clone_try_double "${primary_urls[antlr4-code]}" "${secondary_urls[antlr4-code]}" "antlr4" "$antlr4_tag" true
# remove shared library from install dependencies
sed -i 's/install(TARGETS antlr4_shared/install(TARGETS antlr4_shared OPTIONAL/' antlr4/runtime/Cpp/runtime/CMakeLists.txt
# fix issue https://github.com/antlr/antlr4/issues/3194 - should update Antlr commit once the PR related to the issue gets merged
@ -161,20 +178,20 @@ cppitertools_ref="cb3635456bdb531121b82b4d2e3afc7ae1f56d47"
repo_clone_try_double "${primary_urls[cppitertools]}" "${secondary_urls[cppitertools]}" "cppitertools" "$cppitertools_ref"
# fmt
fmt_tag="7bdf0628b1276379886c7f6dda2cef2b3b374f0b" # (2020-11-25)
repo_clone_try_double "${primary_urls[fmt]}" "${secondary_urls[fmt]}" "fmt" "$fmt_tag"
fmt_tag="7.1.3" # (2020-11-25)
repo_clone_try_double "${primary_urls[fmt]}" "${secondary_urls[fmt]}" "fmt" "$fmt_tag" true
# rapidcheck
rapidcheck_tag="7bc7d302191a4f3d0bf005692677126136e02f60" # (2020-05-04)
repo_clone_try_double "${primary_urls[rapidcheck]}" "${secondary_urls[rapidcheck]}" "rapidcheck" "$rapidcheck_tag"
# google benchmark
benchmark_tag="4f8bfeae470950ef005327973f15b0044eceaceb" # v1.1.0
repo_clone_try_double "${primary_urls[gbenchmark]}" "${secondary_urls[gbenchmark]}" "benchmark" "$benchmark_tag"
benchmark_tag="v1.1.0"
repo_clone_try_double "${primary_urls[gbenchmark]}" "${secondary_urls[gbenchmark]}" "benchmark" "$benchmark_tag" true
# google test
googletest_tag="ec44c6c1675c25b9827aacd08c02433cccde7780" # v1.8.0
repo_clone_try_double "${primary_urls[gtest]}" "${secondary_urls[gtest]}" "googletest" "$googletest_tag"
googletest_tag="release-1.8.0"
repo_clone_try_double "${primary_urls[gtest]}" "${secondary_urls[gtest]}" "googletest" "$googletest_tag" true
# google flags
gflags_tag="b37ceb03a0e56c9f15ce80409438a555f8a67b7c" # custom version (May 6, 2017)
@ -201,19 +218,19 @@ cd ..
bzip2_tag="0405487e2b1de738e7f1c8afb50d19cf44e8d580" # v1.0.6 (May 26, 2011)
repo_clone_try_double "${primary_urls[bzip2]}" "${secondary_urls[bzip2]}" "bzip2" "$bzip2_tag"
zlib_tag="cacf7f1d4e3d44d871b605da3b647f07d718623f" # v1.2.11.
repo_clone_try_double "${primary_urls[zlib]}" "${secondary_urls[zlib]}" "zlib" "$zlib_tag"
zlib_tag="v1.2.11" # v1.2.11.
repo_clone_try_double "${primary_urls[zlib]}" "${secondary_urls[zlib]}" "zlib" "$zlib_tag" true
# remove shared library from install dependencies
sed -i 's/install(TARGETS zlib zlibstatic/install(TARGETS zlibstatic/g' zlib/CMakeLists.txt
rocksdb_tag="f3e33549c151f30ac4eb7c22356c6d0331f37652" # (2020-10-14)
repo_clone_try_double "${primary_urls[rocksdb]}" "${secondary_urls[rocksdb]}" "rocksdb" "$rocksdb_tag"
rocksdb_tag="v6.14.6" # (2020-10-14)
repo_clone_try_double "${primary_urls[rocksdb]}" "${secondary_urls[rocksdb]}" "rocksdb" "$rocksdb_tag" true
# remove shared library from install dependencies
sed -i 's/TARGETS ${ROCKSDB_SHARED_LIB}/TARGETS ${ROCKSDB_SHARED_LIB} OPTIONAL/' rocksdb/CMakeLists.txt
# mgclient
mgclient_tag="v1.3.0" # (2021-09-23)
repo_clone_try_double "${primary_urls[mgclient]}" "${secondary_urls[mgclient]}" "mgclient" "$mgclient_tag"
repo_clone_try_double "${primary_urls[mgclient]}" "${secondary_urls[mgclient]}" "mgclient" "$mgclient_tag" true
sed -i 's/\${CMAKE_INSTALL_LIBDIR}/lib/' mgclient/src/CMakeLists.txt
# pymgclient
@ -222,10 +239,10 @@ repo_clone_try_double "${primary_urls[pymgclient]}" "${secondary_urls[pymgclient
# mgconsole
mgconsole_tag="v1.1.0" # (2021-10-07)
repo_clone_try_double "${primary_urls[mgconsole]}" "${secondary_urls[mgconsole]}" "mgconsole" "$mgconsole_tag"
repo_clone_try_double "${primary_urls[mgconsole]}" "${secondary_urls[mgconsole]}" "mgconsole" "$mgconsole_tag" true
spdlog_tag="46d418164dd4cd9822cf8ca62a116a3f71569241" # (2020-12-01)
repo_clone_try_double "${primary_urls[spdlog]}" "${secondary_urls[spdlog]}" "spdlog" "$spdlog_tag"
spdlog_tag="v1.8.2" # (2020-12-01)
repo_clone_try_double "${primary_urls[spdlog]}" "${secondary_urls[spdlog]}" "spdlog" "$spdlog_tag" true
jemalloc_tag="ea6b3e973b477b8061e0076bb257dbd7f3faa756" # (2021-02-11)
repo_clone_try_double "${primary_urls[jemalloc]}" "${secondary_urls[jemalloc]}" "jemalloc" "$jemalloc_tag"
@ -248,4 +265,27 @@ popd
# librdkafka
librdkafka_tag="v1.7.0" # (2021-05-06)
repo_clone_try_double "${primary_urls[librdkafka]}" "${secondary_urls[librdkafka]}" "librdkafka" "$librdkafka_tag"
repo_clone_try_double "${primary_urls[librdkafka]}" "${secondary_urls[librdkafka]}" "librdkafka" "$librdkafka_tag" true
# protobuf
protobuf_tag="v3.12.4"
repo_clone_try_double "${primary_urls[protobuf]}" "${secondary_urls[protobuf]}" "protobuf" "$protobuf_tag" true
pushd protobuf
./autogen.sh && ./configure --prefix=$(pwd)/lib
popd
# boost
file_get_try_double "${primary_urls[boost]}" "${secondary_urls[boost]}"
tar -xzf boost_1_77_0.tar.gz
mv boost_1_77_0 boost
pushd boost
./bootstrap.sh --prefix=$(pwd)/lib --with-libraries="system,regex"
./b2 -j$(nproc) install
popd
#pulsar
pulsar_tag="v2.8.1"
repo_clone_try_double "${primary_urls[pulsar]}" "${secondary_urls[pulsar]}" "pulsar" "$pulsar_tag" true
pushd pulsar
git apply ../pulsar.patch
popd

View File

@ -1 +1,2 @@
add_subdirectory(kafka)
add_subdirectory(pulsar)

View File

@ -0,0 +1,22 @@
// Copyright 2021 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <chrono>
namespace integrations {
constexpr std::chrono::milliseconds kDefaultBatchInterval{100};
constexpr int64_t kDefaultBatchSize{1000};
constexpr int64_t kDefaultCheckBatchLimit{1};
constexpr std::chrono::milliseconds kDefaultCheckTimeout{30000};
constexpr std::chrono::milliseconds kMinimumInterval{1};
constexpr int64_t kMinimumSize{1};
} // namespace integrations

View File

@ -19,6 +19,8 @@
#include <librdkafka/rdkafkacpp.h>
#include <spdlog/spdlog.h>
#include "integrations/constants.hpp"
#include "integrations/kafka/exceptions.hpp"
#include "utils/exceptions.hpp"
#include "utils/logging.hpp"
@ -27,13 +29,6 @@
namespace integrations::kafka {
constexpr std::chrono::milliseconds kDefaultBatchInterval{100};
constexpr int64_t kDefaultBatchSize = 1000;
constexpr int64_t kDefaultCheckBatchLimit = 1;
constexpr std::chrono::milliseconds kDefaultCheckTimeout{30000};
constexpr std::chrono::milliseconds kMinimumInterval{1};
constexpr int64_t kMinimumSize{1};
namespace {
utils::BasicResult<std::string, std::vector<Message>> GetBatch(RdKafka::KafkaConsumer &consumer,
const ConsumerInfo &info,
@ -114,8 +109,9 @@ int64_t Message::Offset() const {
return c_message->offset;
}
Consumer::Consumer(const std::string &bootstrap_servers, ConsumerInfo info, ConsumerFunction consumer_function)
Consumer::Consumer(ConsumerInfo info, ConsumerFunction consumer_function)
: info_{std::move(info)}, consumer_function_(std::move(consumer_function)), cb_(info_.consumer_name) {
MG_ASSERT(consumer_function_, "Empty consumer function for Kafka consumer");
// NOLINTNEXTLINE (modernize-use-nullptr)
if (info_.batch_interval.value_or(kMinimumInterval) < kMinimumInterval) {
@ -148,7 +144,7 @@ Consumer::Consumer(const std::string &bootstrap_servers, ConsumerInfo info, Cons
throw ConsumerFailedToInitializeException(info_.consumer_name, error);
}
if (conf->set("bootstrap.servers", bootstrap_servers, error) != RdKafka::Conf::CONF_OK) {
if (conf->set("bootstrap.servers", info_.bootstrap_servers, error) != RdKafka::Conf::CONF_OK) {
throw ConsumerFailedToInitializeException(info_.consumer_name, error);
}

View File

@ -83,6 +83,7 @@ struct ConsumerInfo {
std::string consumer_name;
std::vector<std::string> topics;
std::string consumer_group;
std::string bootstrap_servers;
std::optional<std::chrono::milliseconds> batch_interval;
std::optional<int64_t> batch_size;
};
@ -97,7 +98,7 @@ class Consumer final : public RdKafka::EventCb {
///
/// @throws ConsumerFailedToInitializeException if the consumer can't connect
/// to the Kafka endpoint.
Consumer(const std::string &bootstrap_servers, ConsumerInfo info, ConsumerFunction consumer_function);
Consumer(ConsumerInfo info, ConsumerFunction consumer_function);
~Consumer() override;
Consumer(const Consumer &other) = delete;

View File

@ -0,0 +1,8 @@
set(integrations_pulsar_src_files
consumer.cpp
)
find_package(CURL REQUIRED)
add_library(mg-integrations-pulsar STATIC ${integrations_pulsar_src_files})
target_link_libraries(mg-integrations-pulsar mg-utils pulsar Threads::Threads ${CURL_LIBRARIES})

View File

@ -0,0 +1,292 @@
// Copyright 2021 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#include "integrations/pulsar/consumer.hpp"
#include <fmt/format.h>
#include <pulsar/Client.h>
#include <pulsar/InitialPosition.h>
#include <chrono>
#include <thread>
#include "integrations/constants.hpp"
#include "integrations/pulsar/exceptions.hpp"
#include "utils/concepts.hpp"
#include "utils/logging.hpp"
#include "utils/on_scope_exit.hpp"
#include "utils/result.hpp"
#include "utils/thread.hpp"
namespace integrations::pulsar {
namespace {
template <typename T>
concept PulsarConsumer = utils::SameAsAnyOf<T, pulsar_client::Consumer, pulsar_client::Reader>;
pulsar_client::Result ConsumeMessage(pulsar_client::Consumer &consumer, pulsar_client::Message &message,
int remaining_timeout_in_ms) {
return consumer.receive(message, remaining_timeout_in_ms);
}
pulsar_client::Result ConsumeMessage(pulsar_client::Reader &reader, pulsar_client::Message &message,
int remaining_timeout_in_ms) {
return reader.readNext(message, remaining_timeout_in_ms);
}
template <PulsarConsumer TConsumer>
utils::BasicResult<std::string, std::vector<Message>> GetBatch(TConsumer &consumer, const ConsumerInfo &info,
std::atomic<bool> &is_running) {
std::vector<Message> batch{};
const auto batch_size = info.batch_size.value_or(kDefaultBatchSize);
batch.reserve(batch_size);
auto remaining_timeout_in_ms = info.batch_interval.value_or(kDefaultBatchInterval).count();
auto start = std::chrono::steady_clock::now();
for (int64_t i = 0; remaining_timeout_in_ms > 0 && i < batch_size && is_running.load(); ++i) {
pulsar_client::Message message;
const auto result = ConsumeMessage(consumer, message, remaining_timeout_in_ms);
switch (result) {
case pulsar_client::Result::ResultTimeout:
return std::move(batch);
case pulsar_client::Result::ResultOk:
batch.emplace_back(Message{std::move(message)});
break;
default:
spdlog::warn(fmt::format("Unexpected error while consuming message from consumer {}, error: {}",
info.consumer_name, result));
return {pulsar_client::strResult(result)};
}
auto now = std::chrono::steady_clock::now();
auto took = std::chrono::duration_cast<std::chrono::milliseconds>(now - start);
remaining_timeout_in_ms = remaining_timeout_in_ms - took.count();
start = now;
}
return std::move(batch);
}
class SpdlogLogger : public pulsar_client::Logger {
bool isEnabled(Level /*level*/) override { return spdlog::should_log(spdlog::level::trace); }
void log(Level /*level*/, int /*line*/, const std::string &message) override {
spdlog::trace("[Pulsar] {}", message);
}
};
class SpdlogLoggerFactory : public pulsar_client::LoggerFactory {
pulsar_client::Logger *getLogger(const std::string & /*file_name*/) override {
if (!logger_) {
logger_ = std::make_unique<SpdlogLogger>();
}
return logger_.get();
}
private:
std::unique_ptr<SpdlogLogger> logger_;
};
pulsar_client::Client CreateClient(const std::string &service_url) {
static SpdlogLoggerFactory logger_factory;
pulsar_client::ClientConfiguration conf;
conf.setLogger(&logger_factory);
return {service_url, conf};
}
} // namespace
Message::Message(pulsar_client::Message &&message) : message_{std::move(message)} {}
std::span<const char> Message::Payload() const {
return {static_cast<const char *>(message_.getData()), message_.getLength()};
}
Consumer::Consumer(ConsumerInfo info, ConsumerFunction consumer_function)
: info_{std::move(info)},
client_{CreateClient(info_.service_url)},
consumer_function_{std::move(consumer_function)} {
pulsar_client::ConsumerConfiguration config;
config.setSubscriptionInitialPosition(pulsar_client::InitialPositionLatest);
config.setConsumerType(pulsar_client::ConsumerType::ConsumerExclusive);
if (pulsar_client::Result result = client_.subscribe(info_.topics, info_.consumer_name, config, consumer_);
result != pulsar_client::ResultOk) {
throw ConsumerFailedToInitializeException(info_.consumer_name, pulsar_client::strResult(result));
}
}
Consumer::~Consumer() {
StopIfRunning();
consumer_.close();
client_.close();
}
bool Consumer::IsRunning() const { return is_running_; }
const ConsumerInfo &Consumer::Info() const { return info_; }
void Consumer::Start() {
if (is_running_) {
throw ConsumerRunningException(info_.consumer_name);
}
StartConsuming();
}
void Consumer::Stop() {
if (!is_running_) {
throw ConsumerStoppedException(info_.consumer_name);
}
StopConsuming();
}
void Consumer::StopIfRunning() {
if (is_running_) {
StopConsuming();
}
if (thread_.joinable()) {
thread_.join();
}
}
void Consumer::Check(std::optional<std::chrono::milliseconds> timeout, std::optional<int64_t> limit_batches,
const ConsumerFunction &check_consumer_function) const {
// NOLINTNEXTLINE (modernize-use-nullptr)
if (timeout.value_or(kMinimumInterval) < kMinimumInterval) {
throw ConsumerCheckFailedException(info_.consumer_name, "Timeout has to be positive!");
}
if (limit_batches.value_or(kMinimumSize) < kMinimumSize) {
throw ConsumerCheckFailedException(info_.consumer_name, "Batch limit has to be positive!");
}
// The implementation of this function is questionable: it is const qualified, though it changes the inner state of
// PulsarConsumer. Though it changes the inner state, it saves the current assignment for future Check/Start calls to
// restore the current state, so the changes made by this function shouldn't be visible for the users of the class. It
// also passes a non const reference of PulsarConsumer to GetBatch function. That means the object is bitwise const
// (PulsarConsumer is stored in unique_ptr) and internally mostly synchronized. Mostly, because as Start/Stop requires
// exclusive access to consumer, so we don't have to deal with simultaneous calls to those functions. The only concern
// in this function is to prevent executing this function on multiple threads simultaneously.
if (is_running_.exchange(true)) {
throw ConsumerRunningException(info_.consumer_name);
}
utils::OnScopeExit restore_is_running([this] { is_running_.store(false); });
const auto num_of_batches = limit_batches.value_or(kDefaultCheckBatchLimit);
const auto timeout_to_use = timeout.value_or(kDefaultCheckTimeout);
const auto start = std::chrono::steady_clock::now();
if (info_.topics.size() != 1) {
throw ConsumerCheckFailedException(info_.consumer_name, "Check cannot be used for consumers with multiple topics.");
}
std::vector<std::string> partitions;
const auto &topic = info_.topics.front();
client_.getPartitionsForTopic(topic, partitions);
if (partitions.size() > 1) {
throw ConsumerCheckFailedException(info_.consumer_name, "Check cannot be used for topics with multiple partitions");
}
pulsar_client::Reader reader;
client_.createReader(topic, last_message_id_, {}, reader);
for (int64_t i = 0; i < num_of_batches;) {
const auto now = std::chrono::steady_clock::now();
// NOLINTNEXTLINE (modernize-use-nullptr)
if (now - start >= timeout_to_use) {
throw ConsumerCheckFailedException(info_.consumer_name, "Timeout reached");
}
auto maybe_batch = GetBatch(reader, info_, is_running_);
if (maybe_batch.HasError()) {
throw ConsumerCheckFailedException(info_.consumer_name, maybe_batch.GetError());
}
const auto &batch = maybe_batch.GetValue();
if (batch.empty()) {
continue;
}
++i;
try {
check_consumer_function(batch);
} catch (const std::exception &e) {
spdlog::warn("Pulsar consumer {} check failed with error {}", info_.consumer_name, e.what());
throw ConsumerCheckFailedException(info_.consumer_name, e.what());
}
}
reader.close();
}
void Consumer::StartConsuming() {
MG_ASSERT(!is_running_, "Cannot start already running consumer!");
if (thread_.joinable()) {
thread_.join();
}
is_running_.store(true);
thread_ = std::thread([this] {
constexpr auto kMaxThreadNameSize = utils::GetMaxThreadNameSize();
const auto full_thread_name = "Cons#" + info_.consumer_name;
utils::ThreadSetName(full_thread_name.substr(0, kMaxThreadNameSize));
while (is_running_) {
auto maybe_batch = GetBatch(consumer_, info_, is_running_);
if (maybe_batch.HasError()) {
spdlog::warn("Error happened in consumer {} while fetching messages: {}!", info_.consumer_name,
maybe_batch.GetError());
break;
}
const auto &batch = maybe_batch.GetValue();
if (batch.empty()) {
continue;
}
spdlog::info("Pulsar consumer {} is processing a batch", info_.consumer_name);
try {
consumer_function_(batch);
if (std::any_of(batch.begin(), batch.end(), [&](const auto &message) {
if (const auto result = consumer_.acknowledge(message.message_); result != pulsar_client::ResultOk) {
spdlog::warn("Acknowledging a message of consumer {} failed: {}", info_.consumer_name, result);
return true;
}
last_message_id_ = message.message_.getMessageId();
return false;
})) {
break;
}
} catch (const std::exception &e) {
spdlog::warn("Error happened in consumer {} while processing a batch: {}!", info_.consumer_name, e.what());
break;
}
spdlog::info("Pulsar consumer {} finished processing", info_.consumer_name);
}
is_running_.store(false);
});
}
void Consumer::StopConsuming() {
is_running_.store(false);
if (thread_.joinable()) {
thread_.join();
}
}
} // namespace integrations::pulsar

View File

@ -0,0 +1,81 @@
// Copyright 2021 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <atomic>
#include <optional>
#include <span>
#include <thread>
#include <pulsar/Client.h>
namespace integrations::pulsar {
namespace pulsar_client = ::pulsar;
class Consumer;
class Message final {
public:
explicit Message(pulsar_client::Message &&message);
std::span<const char> Payload() const;
private:
pulsar_client::Message message_;
friend Consumer;
};
using ConsumerFunction = std::function<void(const std::vector<Message> &)>;
struct ConsumerInfo {
std::optional<int64_t> batch_size;
std::optional<std::chrono::milliseconds> batch_interval;
std::vector<std::string> topics;
std::string consumer_name;
std::string service_url;
};
class Consumer final {
public:
Consumer(ConsumerInfo info, ConsumerFunction consumer_function);
~Consumer();
Consumer(const Consumer &) = delete;
Consumer(Consumer &&) noexcept = delete;
Consumer &operator=(const Consumer &) = delete;
Consumer &operator=(Consumer &&) = delete;
bool IsRunning() const;
void Start();
void Stop();
void StopIfRunning();
void Check(std::optional<std::chrono::milliseconds> timeout, std::optional<int64_t> limit_batches,
const ConsumerFunction &check_consumer_function) const;
const ConsumerInfo &Info() const;
private:
void StartConsuming();
void StopConsuming();
ConsumerInfo info_;
mutable pulsar_client::Client client_;
pulsar_client::Consumer consumer_;
ConsumerFunction consumer_function_;
mutable std::atomic<bool> is_running_{false};
pulsar_client::MessageId last_message_id_{pulsar_client::MessageId::earliest()};
std::thread thread_;
};
} // namespace integrations::pulsar

View File

@ -0,0 +1,58 @@
// Copyright 2021 Memgraph Ltd.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
// License, and you may not use this file except in compliance with the Business Source License.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
#pragma once
#include <string>
#include "utils/exceptions.hpp"
namespace integrations::pulsar {
class PulsarStreamException : public utils::BasicException {
using utils::BasicException::BasicException;
};
class ConsumerFailedToInitializeException : public PulsarStreamException {
public:
ConsumerFailedToInitializeException(const std::string &consumer_name, const std::string &error)
: PulsarStreamException("Failed to initialize Pulsar consumer {} : {}", consumer_name, error) {}
};
class ConsumerRunningException : public PulsarStreamException {
public:
explicit ConsumerRunningException(const std::string &consumer_name)
: PulsarStreamException("Pulsar consumer {} is already running", consumer_name) {}
};
class ConsumerStoppedException : public PulsarStreamException {
public:
explicit ConsumerStoppedException(const std::string &consumer_name)
: PulsarStreamException("Pulsar consumer {} is already stopped", consumer_name) {}
};
class ConsumerCheckFailedException : public PulsarStreamException {
public:
explicit ConsumerCheckFailedException(const std::string &consumer_name, const std::string &error)
: PulsarStreamException("Pulsar consumer {} check failed: {}", consumer_name, error) {}
};
class ConsumerStartFailedException : public PulsarStreamException {
public:
explicit ConsumerStartFailedException(const std::string &consumer_name, const std::string &error)
: PulsarStreamException("Starting Pulsar consumer {} failed: {}", consumer_name, error) {}
};
class TopicNotFoundException : public PulsarStreamException {
public:
TopicNotFoundException(const std::string &consumer_name, const std::string &topic_name)
: PulsarStreamException("Pulsar consumer {} cannot find topic {}", consumer_name, topic_name) {}
};
} // namespace integrations::pulsar

View File

@ -1119,11 +1119,11 @@ int main(int argc, char **argv) {
}
storage::Storage db(db_config);
query::InterpreterContext interpreter_context{
&db,
{.query = {.allow_load_csv = FLAGS_allow_load_csv}, .execution_timeout_sec = FLAGS_query_execution_timeout_sec},
FLAGS_data_directory,
FLAGS_kafka_bootstrap_servers};
query::InterpreterContext interpreter_context{&db,
{.query = {.allow_load_csv = FLAGS_allow_load_csv},
.execution_timeout_sec = FLAGS_query_execution_timeout_sec,
.default_kafka_bootstrap_servers = FLAGS_kafka_bootstrap_servers},
FLAGS_data_directory};
#ifdef MG_ENTERPRISE
SessionData session_data{&db, &interpreter_context, &auth, &audit_log};
#else
@ -1133,9 +1133,6 @@ int main(int argc, char **argv) {
query::procedure::gModuleRegistry.SetModulesDirectory(query_modules_directories);
query::procedure::gModuleRegistry.UnloadAndLoadModulesFromDirectories();
// As the Stream transformations are using modules, they have to be restored after the query modules are loaded.
interpreter_context.streams.RestoreStreams();
AuthQueryHandler auth_handler(&auth, FLAGS_auth_user_or_role_name_regex);
AuthChecker auth_checker{&auth};
interpreter_context.auth = &auth_handler;
@ -1151,6 +1148,9 @@ int main(int argc, char **argv) {
interpreter_context.auth_checker);
}
// As the Stream transformations are using modules, they have to be restored after the query modules are loaded.
interpreter_context.streams.RestoreStreams();
ServerContext context;
std::string service_name = "Bolt";
if (!FLAGS_bolt_key_file.empty() && !FLAGS_bolt_cert_file.empty()) {

View File

@ -19,13 +19,12 @@
#include <cstdlib>
#endif
#include "utils/likely.hpp"
#include "utils/memory_tracker.hpp"
namespace {
void *newImpl(const std::size_t size) {
auto *ptr = malloc(size);
if (LIKELY(ptr != nullptr)) {
if (ptr != nullptr) [[likely]] {
return ptr;
}
@ -34,7 +33,7 @@ void *newImpl(const std::size_t size) {
void *newImpl(const std::size_t size, const std::align_val_t align) {
auto *ptr = aligned_alloc(static_cast<std::size_t>(align), size);
if (LIKELY(ptr != nullptr)) {
if (ptr != nullptr) [[likely]] {
return ptr;
}
@ -47,14 +46,22 @@ void *newNoExcept(const std::size_t size, const std::align_val_t align) noexcept
}
#if USE_JEMALLOC
void deleteImpl(void *ptr) noexcept { dallocx(ptr, 0); }
void deleteImpl(void *ptr) noexcept {
if (ptr == nullptr) [[unlikely]] {
return;
}
dallocx(ptr, 0);
}
void deleteImpl(void *ptr, const std::align_val_t align) noexcept {
if (ptr == nullptr) [[unlikely]] {
return;
}
dallocx(ptr, MALLOCX_ALIGN(align)); // NOLINT(hicpp-signed-bitwise)
}
void deleteSized(void *ptr, const std::size_t size) noexcept {
if (UNLIKELY(ptr == nullptr)) {
if (ptr == nullptr) [[unlikely]] {
return;
}
@ -62,7 +69,7 @@ void deleteSized(void *ptr, const std::size_t size) noexcept {
}
void deleteSized(void *ptr, const std::size_t size, const std::align_val_t align) noexcept {
if (UNLIKELY(ptr == nullptr)) {
if (ptr == nullptr) [[unlikely]] {
return;
}
@ -81,7 +88,7 @@ void deleteSized(void *ptr, const std::size_t /*unused*/, const std::align_val_t
void TrackMemory(std::size_t size) {
#if USE_JEMALLOC
if (LIKELY(size != 0)) {
if (size != 0) [[likely]] {
size = nallocx(size, 0);
}
#endif
@ -90,7 +97,7 @@ void TrackMemory(std::size_t size) {
void TrackMemory(std::size_t size, const std::align_val_t align) {
#if USE_JEMALLOC
if (LIKELY(size != 0)) {
if (size != 0) [[likely]] {
size = nallocx(size, MALLOCX_ALIGN(align)); // NOLINT(hicpp-signed-bitwise)
}
#endif
@ -120,7 +127,7 @@ bool TrackMemoryNoExcept(const std::size_t size, const std::align_val_t align) {
void UntrackMemory([[maybe_unused]] void *ptr, [[maybe_unused]] std::size_t size = 0) noexcept {
try {
#if USE_JEMALLOC
if (LIKELY(ptr != nullptr)) {
if (ptr != nullptr) [[likely]] {
utils::total_memory_tracker.Free(sallocx(ptr, 0));
}
#else
@ -138,7 +145,7 @@ void UntrackMemory([[maybe_unused]] void *ptr, [[maybe_unused]] std::size_t size
void UntrackMemory(void *ptr, const std::align_val_t align, [[maybe_unused]] std::size_t size = 0) noexcept {
try {
#if USE_JEMALLOC
if (LIKELY(ptr != nullptr)) {
if (ptr != nullptr) [[likely]] {
utils::total_memory_tracker.Free(sallocx(ptr, MALLOCX_ALIGN(align))); // NOLINT(hicpp-signed-bitwise)
}
#else
@ -176,28 +183,28 @@ void *operator new[](const std::size_t size, const std::align_val_t align) {
}
void *operator new(const std::size_t size, const std::nothrow_t & /*unused*/) noexcept {
if (LIKELY(TrackMemoryNoExcept(size))) {
if (TrackMemoryNoExcept(size)) [[likely]] {
return newNoExcept(size);
}
return nullptr;
}
void *operator new[](const std::size_t size, const std::nothrow_t & /*unused*/) noexcept {
if (LIKELY(TrackMemoryNoExcept(size))) {
if (TrackMemoryNoExcept(size)) [[likely]] {
return newNoExcept(size);
}
return nullptr;
}
void *operator new(const std::size_t size, const std::align_val_t align, const std::nothrow_t & /*unused*/) noexcept {
if (LIKELY(TrackMemoryNoExcept(size, align))) {
if (TrackMemoryNoExcept(size, align)) [[likely]] {
return newNoExcept(size, align);
}
return nullptr;
}
void *operator new[](const std::size_t size, const std::align_val_t align, const std::nothrow_t & /*unused*/) noexcept {
if (LIKELY(TrackMemoryNoExcept(size, align))) {
if (TrackMemoryNoExcept(size, align)) [[likely]] {
return newNoExcept(size, align);
}
return nullptr;

View File

@ -44,7 +44,7 @@ add_library(mg-query STATIC ${mg_query_sources})
add_dependencies(mg-query generate_lcp_query)
target_include_directories(mg-query PUBLIC ${CMAKE_SOURCE_DIR}/include)
target_link_libraries(mg-query dl cppitertools)
target_link_libraries(mg-query mg-integrations-kafka mg-storage-v2 mg-utils mg-kvstore mg-memory)
target_link_libraries(mg-query mg-integrations-pulsar mg-integrations-kafka mg-storage-v2 mg-utils mg-kvstore mg-memory)
if("${MG_PYTHON_VERSION}" STREQUAL "")
find_package(Python3 3.5 REQUIRED COMPONENTS Development)
else()

View File

@ -10,6 +10,7 @@
// licenses/APL.txt.
#pragma once
#include <string>
namespace query {
struct InterpreterConfig {
@ -19,5 +20,7 @@ struct InterpreterConfig {
// The default execution timeout is 10 minutes.
double execution_timeout_sec{600.0};
std::string default_kafka_bootstrap_servers;
};
} // namespace query

View File

@ -560,7 +560,9 @@ Callback HandleStreamQuery(StreamQuery *stream_query, const Parameters &paramete
batch_size = GetOptionalValue<int64_t>(stream_query->batch_size_, evaluator),
transformation_name = stream_query->transform_name_, bootstrap_servers = std::move(bootstrap),
owner = StringPointerToOptional(username)]() mutable {
std::string bootstrap = bootstrap_servers ? std::move(*bootstrap_servers) : "";
std::string bootstrap = bootstrap_servers
? std::move(*bootstrap_servers)
: std::string{interpreter_context->config.default_kafka_bootstrap_servers};
interpreter_context->streams.Create<query::KafkaStream>(
stream_name,
{.common_info = {.batch_interval = batch_interval,
@ -570,6 +572,15 @@ Callback HandleStreamQuery(StreamQuery *stream_query, const Parameters &paramete
.consumer_group = std::move(consumer_group),
.bootstrap_servers = std::move(bootstrap)},
std::move(owner));
// interpreter_context->streams.Create<query::PulsarStream>(
// stream_name,
// {.common_info = {.batch_interval = batch_interval,
// .batch_size = batch_size,
// .transformation_name = std::move(transformation_name)},
// .topics = std::move(topic_names),
// .service_url = std::move(bootstrap)},
// std::move(owner));
return std::vector<std::vector<TypedValue>>{};
};
notifications->emplace_back(SeverityLevel::INFO, NotificationCode::CREATE_STREAM,
@ -918,11 +929,8 @@ using RWType = plan::ReadWriteTypeChecker::RWType;
} // namespace
InterpreterContext::InterpreterContext(storage::Storage *db, const InterpreterConfig config,
const std::filesystem::path &data_directory, std::string kafka_bootstrap_servers)
: db(db),
trigger_store(data_directory / "triggers"),
config(config),
streams{this, std::move(kafka_bootstrap_servers), data_directory / "streams"} {}
const std::filesystem::path &data_directory)
: db(db), trigger_store(data_directory / "triggers"), config(config), streams{this, data_directory / "streams"} {}
Interpreter::Interpreter(InterpreterContext *interpreter_context) : interpreter_context_(interpreter_context) {
MG_ASSERT(interpreter_context_, "Interpreter context must not be NULL");

View File

@ -165,7 +165,7 @@ struct PreparedQuery {
*/
struct InterpreterContext {
explicit InterpreterContext(storage::Storage *db, InterpreterConfig config,
const std::filesystem::path &data_directory, std::string kafka_bootstrap_servers);
const std::filesystem::path &data_directory);
storage::Storage *db;

View File

@ -2500,6 +2500,8 @@ mgp_error mgp_message_payload(mgp_message *message, const char **result) {
using MessageType = std::decay_t<T>;
if constexpr (std::same_as<MessageType, mgp_message::KafkaMessage>) {
return msg->Payload().data();
} else if constexpr (std::same_as<MessageType, mgp_message::PulsarMessage>) {
return msg.Payload().data();
} else {
throw std::invalid_argument("Invalid source type");
}
@ -2517,6 +2519,8 @@ mgp_error mgp_message_payload_size(mgp_message *message, size_t *result) {
using MessageType = std::decay_t<T>;
if constexpr (std::same_as<MessageType, mgp_message::KafkaMessage>) {
return msg->Payload().size();
} else if constexpr (std::same_as<MessageType, mgp_message::PulsarMessage>) {
return msg.Payload().size();
} else {
throw std::invalid_argument("Invalid source type");
}

View File

@ -20,6 +20,7 @@
#include <ostream>
#include "integrations/kafka/consumer.hpp"
#include "integrations/pulsar/consumer.hpp"
#include "query/context.hpp"
#include "query/db_accessor.hpp"
#include "query/procedure/cypher_type_ptr.hpp"
@ -802,9 +803,11 @@ bool IsValidIdentifierName(const char *name);
struct mgp_message {
explicit mgp_message(const integrations::kafka::Message &message) : msg{&message} {}
explicit mgp_message(const integrations::pulsar::Message &message) : msg{message} {}
using KafkaMessage = const integrations::kafka::Message *;
std::variant<KafkaMessage> msg;
using PulsarMessage = integrations::pulsar::Message;
std::variant<KafkaMessage, PulsarMessage> msg;
};
struct mgp_messages {

View File

@ -61,7 +61,7 @@ concept Stream = requires(TStream stream) {
requires ConvertableToJson<typename TStream::StreamInfo>;
};
enum class StreamSourceType : uint8_t { KAFKA };
enum class StreamSourceType : uint8_t { KAFKA, PULSAR };
template <Stream T>
StreamSourceType StreamType(const T & /*stream*/);

View File

@ -20,10 +20,11 @@ KafkaStream::KafkaStream(std::string stream_name, StreamInfo stream_info,
.consumer_name = std::move(stream_name),
.topics = std::move(stream_info.topics),
.consumer_group = std::move(stream_info.consumer_group),
.bootstrap_servers = std::move(stream_info.bootstrap_servers),
.batch_interval = stream_info.common_info.batch_interval,
.batch_size = stream_info.common_info.batch_size,
};
consumer_.emplace(std::move(stream_info.bootstrap_servers), std::move(consumer_info), std::move(consumer_function));
consumer_.emplace(std::move(consumer_info), std::move(consumer_function));
};
KafkaStream::StreamInfo KafkaStream::Info(std::string transformation_name) const {
@ -32,7 +33,8 @@ KafkaStream::StreamInfo KafkaStream::Info(std::string transformation_name) const
.batch_size = info.batch_size,
.transformation_name = std::move(transformation_name)},
.topics = info.topics,
.consumer_group = info.consumer_group};
.consumer_group = info.consumer_group,
.bootstrap_servers = info.bootstrap_servers};
}
void KafkaStream::Start() { consumer_->Start(); }
@ -67,4 +69,49 @@ void from_json(const nlohmann::json &data, KafkaStream::StreamInfo &info) {
data.at(kConsumerGroupKey).get_to(info.consumer_group);
data.at(kBoostrapServers).get_to(info.bootstrap_servers);
}
PulsarStream::PulsarStream(std::string stream_name, StreamInfo stream_info,
ConsumerFunction<integrations::pulsar::Message> consumer_function) {
integrations::pulsar::ConsumerInfo consumer_info{.batch_size = stream_info.common_info.batch_size,
.batch_interval = stream_info.common_info.batch_interval,
.topics = std::move(stream_info.topics),
.consumer_name = std::move(stream_name),
.service_url = std::move(stream_info.service_url)};
consumer_.emplace(std::move(consumer_info), std::move(consumer_function));
};
PulsarStream::StreamInfo PulsarStream::Info(std::string transformation_name) const {
const auto &info = consumer_->Info();
return {{.batch_interval = info.batch_interval,
.batch_size = info.batch_size,
.transformation_name = std::move(transformation_name)},
.topics = info.topics,
.service_url = info.service_url};
}
void PulsarStream::Start() { consumer_->Start(); }
void PulsarStream::Stop() { consumer_->Stop(); }
bool PulsarStream::IsRunning() const { return consumer_->IsRunning(); }
void PulsarStream::Check(std::optional<std::chrono::milliseconds> timeout, std::optional<int64_t> batch_limit,
const ConsumerFunction<Message> &consumer_function) const {
consumer_->Check(timeout, batch_limit, consumer_function);
}
namespace {
const std::string kServiceUrl{"service_url"};
} // namespace
void to_json(nlohmann::json &data, PulsarStream::StreamInfo &&info) {
data[kCommonInfoKey] = std::move(info.common_info);
data[kTopicsKey] = std::move(info.topics);
data[kServiceUrl] = std::move(info.service_url);
}
void from_json(const nlohmann::json &data, PulsarStream::StreamInfo &info) {
data.at(kCommonInfoKey).get_to(info.common_info);
data.at(kTopicsKey).get_to(info.topics);
data.at(kServiceUrl).get_to(info.service_url);
}
} // namespace query

View File

@ -14,6 +14,7 @@
#include "query/stream/common.hpp"
#include "integrations/kafka/consumer.hpp"
#include "integrations/pulsar/consumer.hpp"
namespace query {
@ -54,4 +55,37 @@ inline StreamSourceType StreamType(const KafkaStream & /*stream*/) {
return StreamSourceType::KAFKA;
}
struct PulsarStream {
struct StreamInfo {
CommonStreamInfo common_info;
std::vector<std::string> topics;
std::string service_url;
};
using Message = integrations::pulsar::Message;
PulsarStream(std::string stream_name, StreamInfo stream_info, ConsumerFunction<Message> consumer_function);
StreamInfo Info(std::string transformation_name) const;
void Start();
void Stop();
bool IsRunning() const;
void Check(std::optional<std::chrono::milliseconds> timeout, std::optional<int64_t> batch_limit,
const ConsumerFunction<Message> &consumer_function) const;
private:
using Consumer = integrations::pulsar::Consumer;
std::optional<Consumer> consumer_;
};
void to_json(nlohmann::json &data, PulsarStream::StreamInfo &&info);
void from_json(const nlohmann::json &data, PulsarStream::StreamInfo &info);
template <>
inline StreamSourceType StreamType(const PulsarStream & /*stream*/) {
return StreamSourceType::PULSAR;
}
} // namespace query

View File

@ -164,11 +164,9 @@ struct Overloaded : Ts... {
template <class... Ts>
Overloaded(Ts...) -> Overloaded<Ts...>;
} // namespace
Streams::Streams(InterpreterContext *interpreter_context, std::string bootstrap_servers,
std::filesystem::path directory)
: interpreter_context_(interpreter_context),
bootstrap_servers_(std::move(bootstrap_servers)),
storage_(std::move(directory)) {
Streams::Streams(InterpreterContext *interpreter_context, std::filesystem::path directory)
: interpreter_context_(interpreter_context), storage_(std::move(directory)) {
constexpr std::string_view proc_name = "kafka_set_stream_offset";
auto set_stream_offset = [this, proc_name](mgp_list *args, mgp_graph * /*graph*/, mgp_result *result,
mgp_memory * /*memory*/) {
@ -221,6 +219,8 @@ void Streams::Create(const std::string &stream_name, typename TStream::StreamInf
template void Streams::Create<KafkaStream>(const std::string &stream_name, KafkaStream::StreamInfo info,
std::optional<std::string> owner);
template void Streams::Create<PulsarStream>(const std::string &stream_name, PulsarStream::StreamInfo info,
std::optional<std::string> owner);
template <Stream TStream>
Streams::StreamsMap::iterator Streams::CreateConsumer(StreamsMap &map, const std::string &stream_name,
@ -276,10 +276,6 @@ Streams::StreamsMap::iterator Streams::CreateConsumer(StreamsMap &map, const std
result.rows.clear();
};
if (stream_info.bootstrap_servers.empty()) {
stream_info.bootstrap_servers = bootstrap_servers_;
}
auto insert_result = map.try_emplace(
stream_name, StreamData<TStream>{std::move(stream_info.common_info.transformation_name), std::move(owner),
std::make_unique<SynchronizedStreamSource<TStream>>(
@ -313,7 +309,7 @@ void Streams::RestoreStreams() {
MG_ASSERT(status.name == stream_name, "Expected stream name is '{}', but got '{}'", status.name, stream_name);
try {
auto it = CreateConsumer<T>(*locked_streams_map, stream_name, std::move(status.info), {});
auto it = CreateConsumer<T>(*locked_streams_map, stream_name, std::move(status.info), std::move(status.owner));
if (status.is_running) {
std::visit(
[&](auto &&stream_data) {
@ -335,6 +331,9 @@ void Streams::RestoreStreams() {
case StreamSourceType::KAFKA:
create_consumer(StreamStatus<KafkaStream>{}, std::move(stream_json_data));
break;
case StreamSourceType::PULSAR:
create_consumer(StreamStatus<PulsarStream>{}, std::move(stream_json_data));
break;
}
}
}
@ -474,6 +473,4 @@ TransformationResult Streams::Check(const std::string &stream_name, std::optiona
it->second);
}
std::string_view Streams::BootstrapServers() const { return bootstrap_servers_; }
} // namespace query

View File

@ -75,9 +75,8 @@ class Streams final {
/// Initializes the streams.
///
/// @param interpreter_context context to use to run the result of transformations
/// @param bootstrap_servers initial list of brokers as a comma separated list of broker host or host:port
/// @param directory a directory path to store the persisted streams metadata
Streams(InterpreterContext *interpreter_context, std::string bootstrap_servers, std::filesystem::path directory);
Streams(InterpreterContext *interpreter_context, std::filesystem::path directory);
/// Restores the streams from the persisted metadata.
/// The restoration is done in a best effort manner, therefore no exception is thrown on failure, but the error is
@ -163,7 +162,7 @@ class Streams final {
std::unique_ptr<SynchronizedStreamSource<TStream>> stream_source;
};
using StreamDataVariant = std::variant<StreamData<KafkaStream>>;
using StreamDataVariant = std::variant<StreamData<KafkaStream>, StreamData<PulsarStream>>;
using StreamsMap = std::unordered_map<std::string, StreamDataVariant>;
using SynchronizedStreamsMap = utils::Synchronized<StreamsMap, utils::WritePrioritizedRWLock>;
@ -180,7 +179,6 @@ class Streams final {
}
InterpreterContext *interpreter_context_;
std::string bootstrap_servers_;
kvstore::KVStore storage_;
SynchronizedStreamsMap streams_;

View File

@ -48,7 +48,7 @@ class ExpansionBenchFixture : public benchmark::Fixture {
MG_ASSERT(db->CreateIndex(label));
interpreter_context.emplace(&*db, query::InterpreterConfig{}, data_directory, "non existing bootstrap servers");
interpreter_context.emplace(&*db, query::InterpreterConfig{}, data_directory);
interpreter.emplace(&*interpreter_context);
}

View File

@ -31,8 +31,7 @@ int main(int argc, char *argv[]) {
utils::OnScopeExit([&data_directory] { std::filesystem::remove_all(data_directory); });
utils::license::global_license_checker.EnableTesting();
query::InterpreterContext interpreter_context{&db, query::InterpreterConfig{}, data_directory,
"non existing bootstrap servers"};
query::InterpreterContext interpreter_context{&db, query::InterpreterConfig{}, data_directory};
query::Interpreter interpreter{&interpreter_context};
ResultStreamFaker stream(&db);

View File

@ -53,7 +53,7 @@ target_link_libraries(storage_test_utils mg-storage-v2)
# Test integrations-kafka
add_library(kafka-mock STATIC kafka_mock.cpp)
target_link_libraries(kafka-mock mg-utils librdkafka++ librdkafka Threads::Threads zlib gtest)
target_link_libraries(kafka-mock mg-utils librdkafka++ librdkafka Threads::Threads gtest)
# Include directories are intentionally not set, because kafka-mock isn't meant to be used apart from unit tests
add_unit_test(integrations_kafka_consumer.cpp kafka_mock.cpp)

View File

@ -51,6 +51,7 @@ struct ConsumerTest : public ::testing::Test {
.consumer_name = "Consumer" + test_name,
.topics = {kTopicName},
.consumer_group = "ConsumerGroup " + test_name,
.bootstrap_servers = cluster.Bootstraps(),
.batch_interval = std::nullopt,
.batch_size = std::nullopt,
};
@ -76,8 +77,7 @@ struct ConsumerTest : public ::testing::Test {
}
};
auto consumer =
std::make_unique<Consumer>(cluster.Bootstraps(), std::move(info), std::move(consumer_function_wrapper));
auto consumer = std::make_unique<Consumer>(std::move(info), std::move(consumer_function_wrapper));
int sent_messages{1};
SeedTopicWithInt(kTopicName, sent_messages);
@ -169,7 +169,7 @@ TEST_F(ConsumerTest, BatchInterval) {
}
TEST_F(ConsumerTest, StartStop) {
Consumer consumer{cluster.Bootstraps(), CreateDefaultConsumerInfo(), kDummyConsumerFunction};
Consumer consumer{CreateDefaultConsumerInfo(), kDummyConsumerFunction};
auto start = [&consumer](const bool use_conditional) {
if (use_conditional) {
@ -279,41 +279,41 @@ TEST_F(ConsumerTest, BatchSize) {
TEST_F(ConsumerTest, InvalidBootstrapServers) {
auto info = CreateDefaultConsumerInfo();
info.bootstrap_servers = "non.existing.host:9092";
EXPECT_THROW(Consumer("non.existing.host:9092", std::move(info), kDummyConsumerFunction),
ConsumerFailedToInitializeException);
EXPECT_THROW(Consumer(std::move(info), kDummyConsumerFunction), ConsumerFailedToInitializeException);
}
TEST_F(ConsumerTest, InvalidTopic) {
auto info = CreateDefaultConsumerInfo();
info.topics = {"Non existing topic"};
EXPECT_THROW(Consumer(cluster.Bootstraps(), std::move(info), kDummyConsumerFunction), TopicNotFoundException);
EXPECT_THROW(Consumer(std::move(info), kDummyConsumerFunction), TopicNotFoundException);
}
TEST_F(ConsumerTest, InvalidBatchInterval) {
auto info = CreateDefaultConsumerInfo();
info.batch_interval = std::chrono::milliseconds{0};
EXPECT_THROW(Consumer(cluster.Bootstraps(), info, kDummyConsumerFunction), ConsumerFailedToInitializeException);
EXPECT_THROW(Consumer(info, kDummyConsumerFunction), ConsumerFailedToInitializeException);
info.batch_interval = std::chrono::milliseconds{-1};
EXPECT_THROW(Consumer(cluster.Bootstraps(), info, kDummyConsumerFunction), ConsumerFailedToInitializeException);
EXPECT_THROW(Consumer(info, kDummyConsumerFunction), ConsumerFailedToInitializeException);
info.batch_interval = std::chrono::milliseconds{1};
EXPECT_NO_THROW(Consumer(cluster.Bootstraps(), info, kDummyConsumerFunction));
EXPECT_NO_THROW(Consumer(info, kDummyConsumerFunction));
}
TEST_F(ConsumerTest, InvalidBatchSize) {
auto info = CreateDefaultConsumerInfo();
info.batch_size = 0;
EXPECT_THROW(Consumer(cluster.Bootstraps(), info, kDummyConsumerFunction), ConsumerFailedToInitializeException);
EXPECT_THROW(Consumer(info, kDummyConsumerFunction), ConsumerFailedToInitializeException);
info.batch_size = -1;
EXPECT_THROW(Consumer(cluster.Bootstraps(), info, kDummyConsumerFunction), ConsumerFailedToInitializeException);
EXPECT_THROW(Consumer(info, kDummyConsumerFunction), ConsumerFailedToInitializeException);
info.batch_size = 1;
EXPECT_NO_THROW(Consumer(cluster.Bootstraps(), info, kDummyConsumerFunction));
EXPECT_NO_THROW(Consumer(info, kDummyConsumerFunction));
}
TEST_F(ConsumerTest, DISABLED_StartsFromPreviousOffset) {
@ -348,7 +348,7 @@ TEST_F(ConsumerTest, DISABLED_StartsFromPreviousOffset) {
std::string_view{kMessagePrefix + std::to_string(received_message_count + sent_messages)});
}
auto expected_total_messages = received_message_count + batch_count;
auto consumer = std::make_unique<Consumer>(cluster.Bootstraps(), ConsumerInfo{info}, consumer_function);
auto consumer = std::make_unique<Consumer>(ConsumerInfo{info}, consumer_function);
ASSERT_FALSE(consumer->IsRunning());
consumer->Start();
const auto start = std::chrono::steady_clock::now();
@ -419,7 +419,7 @@ TEST_F(ConsumerTest, CheckMethodWorks) {
}
TEST_F(ConsumerTest, CheckMethodTimeout) {
Consumer consumer{cluster.Bootstraps(), CreateDefaultConsumerInfo(), kDummyConsumerFunction};
Consumer consumer{CreateDefaultConsumerInfo(), kDummyConsumerFunction};
std::chrono::milliseconds timeout{3000};
@ -433,7 +433,7 @@ TEST_F(ConsumerTest, CheckMethodTimeout) {
}
TEST_F(ConsumerTest, CheckWithInvalidTimeout) {
Consumer consumer{cluster.Bootstraps(), CreateDefaultConsumerInfo(), kDummyConsumerFunction};
Consumer consumer{CreateDefaultConsumerInfo(), kDummyConsumerFunction};
const auto start = std::chrono::steady_clock::now();
EXPECT_THROW(consumer.Check(std::chrono::milliseconds{0}, std::nullopt, kDummyConsumerFunction),
@ -448,7 +448,7 @@ TEST_F(ConsumerTest, CheckWithInvalidTimeout) {
}
TEST_F(ConsumerTest, CheckWithInvalidBatchSize) {
Consumer consumer{cluster.Bootstraps(), CreateDefaultConsumerInfo(), kDummyConsumerFunction};
Consumer consumer{CreateDefaultConsumerInfo(), kDummyConsumerFunction};
const auto start = std::chrono::steady_clock::now();
EXPECT_THROW(consumer.Check(std::nullopt, 0, kDummyConsumerFunction), ConsumerCheckFailedException);
@ -482,9 +482,9 @@ TEST_F(ConsumerTest, ConsumerStatus) {
EXPECT_EQ(topics[1], info.topics[1]);
};
Consumer consumer{cluster.Bootstraps(),
ConsumerInfo{kConsumerName, topics, kConsumerGroupName, kBatchInterval, kBatchSize},
kDummyConsumerFunction};
Consumer consumer{
ConsumerInfo{kConsumerName, topics, kConsumerGroupName, cluster.Bootstraps(), kBatchInterval, kBatchSize},
kDummyConsumerFunction};
check_info(consumer.Info());
consumer.Start();

View File

@ -54,8 +54,7 @@ auto ToEdgeList(const communication::bolt::Value &v) {
struct InterpreterFaker {
InterpreterFaker(storage::Storage *db, const query::InterpreterConfig config,
const std::filesystem::path &data_directory)
: interpreter_context(db, config, data_directory, "not used bootstrap servers"),
interpreter(&interpreter_context) {
: interpreter_context(db, config, data_directory), interpreter(&interpreter_context) {
interpreter_context.auth_checker = &auth_checker;
}

View File

@ -203,7 +203,7 @@ DatabaseState GetState(storage::Storage *db) {
auto Execute(storage::Storage *db, const std::string &query) {
auto data_directory = std::filesystem::temp_directory_path() / "MG_tests_unit_query_dump";
query::InterpreterContext context(db, query::InterpreterConfig{}, data_directory, "non existing bootstrap servers");
query::InterpreterContext context(db, query::InterpreterConfig{}, data_directory);
query::Interpreter interpreter(&context);
ResultStreamFaker stream(db);
@ -746,9 +746,7 @@ TEST(DumpTest, ExecuteDumpDatabase) {
class StatefulInterpreter {
public:
explicit StatefulInterpreter(storage::Storage *db)
: db_(db),
context_(db_, query::InterpreterConfig{}, data_directory_, "non existing bootstrap servers"),
interpreter_(&context_) {}
: db_(db), context_(db_, query::InterpreterConfig{}, data_directory_), interpreter_(&context_) {}
auto Execute(const std::string &query) {
ResultStreamFaker stream(db_);

View File

@ -35,7 +35,7 @@ class QueryExecution : public testing::Test {
void SetUp() {
db_.emplace();
interpreter_context_.emplace(&*db_, query::InterpreterConfig{}, data_directory, "non existing bootstrap servers");
interpreter_context_.emplace(&*db_, query::InterpreterConfig{}, data_directory);
interpreter_.emplace(&*interpreter_context_);
}

View File

@ -39,21 +39,6 @@ std::string GetDefaultStreamName() {
return std::string{::testing::UnitTest::GetInstance()->current_test_info()->name()};
}
StreamInfo CreateDefaultStreamInfo() {
return StreamInfo{.common_info{
.batch_interval = std::nullopt,
.batch_size = std::nullopt,
.transformation_name = "not used in the tests",
},
.topics = {kTopicName},
.consumer_group = "ConsumerGroup " + GetDefaultStreamName(),
.bootstrap_servers = ""};
}
StreamCheckData CreateDefaultStreamCheckData() {
return {GetDefaultStreamName(), CreateDefaultStreamInfo(), false, std::nullopt};
}
std::filesystem::path GetCleanDataDirectory() {
const auto path = std::filesystem::temp_directory_path() / "query-streams";
std::filesystem::remove_all(path);
@ -74,14 +59,11 @@ class StreamsTest : public ::testing::Test {
// Streams constructor.
// InterpreterContext::auth_checker_ is used in the Streams object, but only in the message processing part. Because
// these tests don't send any messages, the auth_checker_ pointer can be left as nullptr.
query::InterpreterContext interpreter_context_{&db_, query::InterpreterConfig{}, data_directory_,
"dont care bootstrap servers"};
query::InterpreterContext interpreter_context_{&db_, query::InterpreterConfig{}, data_directory_};
std::filesystem::path streams_data_directory_{data_directory_ / "separate-dir-for-test"};
std::optional<Streams> streams_;
void ResetStreamsObject() {
streams_.emplace(&interpreter_context_, mock_cluster_.Bootstraps(), streams_data_directory_);
}
void ResetStreamsObject() { streams_.emplace(&interpreter_context_, streams_data_directory_); }
void CheckStreamStatus(const StreamCheckData &check_data) {
SCOPED_TRACE(fmt::format("Checking status of '{}'", check_data.name));
@ -106,6 +88,21 @@ class StreamsTest : public ::testing::Test {
check_data.is_running = false;
}
StreamInfo CreateDefaultStreamInfo() {
return StreamInfo{.common_info{
.batch_interval = std::nullopt,
.batch_size = std::nullopt,
.transformation_name = "not used in the tests",
},
.topics = {kTopicName},
.consumer_group = "ConsumerGroup " + GetDefaultStreamName(),
.bootstrap_servers = mock_cluster_.Bootstraps()};
}
StreamCheckData CreateDefaultStreamCheckData() {
return {GetDefaultStreamName(), CreateDefaultStreamInfo(), false, std::nullopt};
}
void Clear() {
if (!std::filesystem::exists(data_directory_)) return;
std::filesystem::remove_all(data_directory_);