Procedure for stream configs (#301)

This commit is contained in:
Antonio Andelic 2021-11-29 08:56:10 +01:00 committed by GitHub
parent d277dd49a3
commit aabec99a8e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 320 additions and 32 deletions

View File

@ -18,6 +18,7 @@
#include <spdlog/spdlog.h>
#include <json/json.hpp>
#include "mg_procedure.h"
#include "query/db_accessor.hpp"
#include "query/discard_value_stream.hpp"
#include "query/exceptions.hpp"
@ -157,37 +158,281 @@ void from_json(const nlohmann::json &data, StreamStatus<TStream> &status) {
from_json(data, status.info);
}
namespace {
template <typename Fun>
[[nodiscard]] bool TryOrSetError(Fun &&func, mgp_result *result) {
if (const auto err = func(); err == MGP_ERROR_UNABLE_TO_ALLOCATE) {
static_cast<void>(mgp_result_set_error_msg(result, "Not enough memory!"));
return false;
} else if (err != MGP_ERROR_NO_ERROR) {
const auto error_msg = fmt::format("Unexpected error ({})!", err);
static_cast<void>(mgp_result_set_error_msg(result, error_msg.c_str()));
return false;
}
return true;
}
[[nodiscard]] auto GetStringValueOrSetError(const char *string, mgp_memory *memory, mgp_result *result) {
procedure::MgpUniquePtr<mgp_value> value{nullptr, mgp_value_destroy};
const auto success =
TryOrSetError([&] { return procedure::CreateMgpObject(value, mgp_value_make_string, string, memory); }, result);
if (!success) {
value.reset();
}
return value;
}
[[nodiscard]] bool InsertResultOrSetError(mgp_result *result, mgp_result_record *record, const auto *result_name,
mgp_value *value) {
if (const auto err = mgp_result_record_insert(record, result_name, value); err != MGP_ERROR_NO_ERROR) {
const auto error_msg = fmt::format("Unable to set the result for {}, error = {}", result_name, err);
static_cast<void>(mgp_result_set_error_msg(result, error_msg.c_str()));
return false;
}
return true;
}
} // namespace
Streams::Streams(InterpreterContext *interpreter_context, std::filesystem::path directory)
: interpreter_context_(interpreter_context), storage_(std::move(directory)) {
constexpr std::string_view proc_name = "kafka_set_stream_offset";
auto set_stream_offset = [this, proc_name](mgp_list *args, mgp_graph * /*graph*/, mgp_result *result,
mgp_memory * /*memory*/) {
auto *arg_stream_name = procedure::Call<mgp_value *>(mgp_list_at, args, 0);
const auto *stream_name = procedure::Call<const char *>(mgp_value_get_string, arg_stream_name);
auto *arg_offset = procedure::Call<mgp_value *>(mgp_list_at, args, 1);
const auto offset = procedure::Call<int64_t>(mgp_value_get_int, arg_offset);
auto lock_ptr = streams_.Lock();
auto it = GetStream(*lock_ptr, std::string(stream_name));
std::visit(utils::Overloaded{
[&](StreamData<KafkaStream> &kafka_stream) {
auto stream_source_ptr = kafka_stream.stream_source->Lock();
const auto error = stream_source_ptr->SetStreamOffset(offset);
if (error.HasError()) {
MG_ASSERT(mgp_result_set_error_msg(result, error.GetError().c_str()) == MGP_ERROR_NO_ERROR,
"Unable to set procedure error message of procedure: {}", proc_name);
}
},
[proc_name](auto && /*other*/) {
throw QueryRuntimeException("'{}' can be only used for Kafka stream sources", proc_name);
}},
it->second);
};
RegisterProcedures();
}
mgp_proc proc(proc_name, set_stream_offset, utils::NewDeleteResource(), false);
MG_ASSERT(mgp_proc_add_arg(&proc, "stream_name", procedure::Call<mgp_type *>(mgp_type_string)) == MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_arg(&proc, "offset", procedure::Call<mgp_type *>(mgp_type_int)) == MGP_ERROR_NO_ERROR);
void Streams::RegisterProcedures() {
RegisterKafkaProcedures();
RegisterPulsarProcedures();
}
procedure::gModuleRegistry.RegisterMgProcedure(proc_name, std::move(proc));
void Streams::RegisterKafkaProcedures() {
{
constexpr std::string_view proc_name = "kafka_set_stream_offset";
auto set_stream_offset = [this, proc_name](mgp_list *args, mgp_graph * /*graph*/, mgp_result *result,
mgp_memory * /*memory*/) {
auto *arg_stream_name = procedure::Call<mgp_value *>(mgp_list_at, args, 0);
const auto *stream_name = procedure::Call<const char *>(mgp_value_get_string, arg_stream_name);
auto *arg_offset = procedure::Call<mgp_value *>(mgp_list_at, args, 1);
const auto offset = procedure::Call<int64_t>(mgp_value_get_int, arg_offset);
auto lock_ptr = streams_.Lock();
auto it = GetStream(*lock_ptr, std::string(stream_name));
std::visit(utils::Overloaded{
[&](StreamData<KafkaStream> &kafka_stream) {
auto stream_source_ptr = kafka_stream.stream_source->Lock();
const auto error = stream_source_ptr->SetStreamOffset(offset);
if (error.HasError()) {
MG_ASSERT(mgp_result_set_error_msg(result, error.GetError().c_str()) == MGP_ERROR_NO_ERROR,
"Unable to set procedure error message of procedure: {}", proc_name);
}
},
[proc_name](auto && /*other*/) {
throw QueryRuntimeException("'{}' can be only used for Kafka stream sources", proc_name);
}},
it->second);
};
mgp_proc proc(proc_name, set_stream_offset, utils::NewDeleteResource(), false);
MG_ASSERT(mgp_proc_add_arg(&proc, "stream_name", procedure::Call<mgp_type *>(mgp_type_string)) ==
MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_arg(&proc, "offset", procedure::Call<mgp_type *>(mgp_type_int)) == MGP_ERROR_NO_ERROR);
procedure::gModuleRegistry.RegisterMgProcedure(proc_name, std::move(proc));
}
{
constexpr std::string_view proc_name = "kafka_stream_info";
constexpr std::string_view consumer_group_result_name = "consumer_group";
constexpr std::string_view topics_result_name = "topics";
constexpr std::string_view bootstrap_servers_result_name = "bootstrap_servers";
auto get_stream_info = [this, proc_name, consumer_group_result_name, topics_result_name,
bootstrap_servers_result_name](mgp_list *args, mgp_graph * /*graph*/, mgp_result *result,
mgp_memory *memory) {
auto *arg_stream_name = procedure::Call<mgp_value *>(mgp_list_at, args, 0);
const auto *stream_name = procedure::Call<const char *>(mgp_value_get_string, arg_stream_name);
auto lock_ptr = streams_.Lock();
auto it = GetStream(*lock_ptr, std::string(stream_name));
std::visit(
utils::Overloaded{
[&](StreamData<KafkaStream> &kafka_stream) {
auto stream_source_ptr = kafka_stream.stream_source->Lock();
const auto info = stream_source_ptr->Info(kafka_stream.transformation_name);
mgp_result_record *record{nullptr};
{
const auto success = TryOrSetError([&] { return mgp_result_new_record(result, &record); }, result);
if (!success) {
return;
}
}
const auto consumer_group_value = GetStringValueOrSetError(info.consumer_group.c_str(), memory, result);
if (!consumer_group_value) {
return;
}
procedure::MgpUniquePtr<mgp_list> topic_names{nullptr, mgp_list_destroy};
{
const auto success = TryOrSetError(
[&] {
return procedure::CreateMgpObject(topic_names, mgp_list_make_empty, info.topics.size(), memory);
},
result);
if (!success) {
return;
}
}
for (const auto &topic : info.topics) {
auto topic_value = GetStringValueOrSetError(topic.c_str(), memory, result);
if (!topic_value) {
return;
}
topic_names->elems.push_back(std::move(*topic_value));
}
procedure::MgpUniquePtr<mgp_value> topics_value{nullptr, mgp_value_destroy};
{
const auto success = TryOrSetError(
[&] {
return procedure::CreateMgpObject(topics_value, mgp_value_make_list, topic_names.release());
},
result);
if (!success) {
return;
}
}
const auto bootstrap_servers_value =
GetStringValueOrSetError(info.bootstrap_servers.c_str(), memory, result);
if (!bootstrap_servers_value) {
return;
}
if (!InsertResultOrSetError(result, record, consumer_group_result_name.data(),
consumer_group_value.get())) {
return;
}
if (!InsertResultOrSetError(result, record, topics_result_name.data(), topics_value.get())) {
return;
}
if (!InsertResultOrSetError(result, record, bootstrap_servers_result_name.data(),
bootstrap_servers_value.get())) {
return;
}
},
[proc_name](auto && /*other*/) {
throw QueryRuntimeException("'{}' can be only used for Kafka stream sources", proc_name);
}},
it->second);
};
mgp_proc proc(proc_name, get_stream_info, utils::NewDeleteResource(), false);
MG_ASSERT(mgp_proc_add_arg(&proc, "stream_name", procedure::Call<mgp_type *>(mgp_type_string)) ==
MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_result(&proc, consumer_group_result_name.data(),
procedure::Call<mgp_type *>(mgp_type_string)) == MGP_ERROR_NO_ERROR);
MG_ASSERT(
mgp_proc_add_result(&proc, topics_result_name.data(),
procedure::Call<mgp_type *>(mgp_type_list, procedure::Call<mgp_type *>(mgp_type_string))) ==
MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_result(&proc, bootstrap_servers_result_name.data(),
procedure::Call<mgp_type *>(mgp_type_string)) == MGP_ERROR_NO_ERROR);
procedure::gModuleRegistry.RegisterMgProcedure(proc_name, std::move(proc));
}
}
void Streams::RegisterPulsarProcedures() {
{
constexpr std::string_view proc_name = "pulsar_stream_info";
constexpr std::string_view service_url_result_name = "service_url";
constexpr std::string_view topics_result_name = "topics";
auto get_stream_info = [this, proc_name, service_url_result_name, topics_result_name](
mgp_list *args, mgp_graph * /*graph*/, mgp_result *result, mgp_memory *memory) {
auto *arg_stream_name = procedure::Call<mgp_value *>(mgp_list_at, args, 0);
const auto *stream_name = procedure::Call<const char *>(mgp_value_get_string, arg_stream_name);
auto lock_ptr = streams_.Lock();
auto it = GetStream(*lock_ptr, std::string(stream_name));
std::visit(
utils::Overloaded{
[&](StreamData<PulsarStream> &pulsar_stream) {
auto stream_source_ptr = pulsar_stream.stream_source->Lock();
const auto info = stream_source_ptr->Info(pulsar_stream.transformation_name);
mgp_result_record *record{nullptr};
{
const auto success = TryOrSetError([&] { return mgp_result_new_record(result, &record); }, result);
if (!success) {
return;
}
}
auto service_url_value = GetStringValueOrSetError(info.service_url.c_str(), memory, result);
if (!service_url_value) {
return;
}
procedure::MgpUniquePtr<mgp_list> topic_names{nullptr, mgp_list_destroy};
{
const auto success = TryOrSetError(
[&] {
return procedure::CreateMgpObject(topic_names, mgp_list_make_empty, info.topics.size(), memory);
},
result);
if (!success) {
return;
}
}
for (const auto &topic : info.topics) {
auto topic_value = GetStringValueOrSetError(topic.c_str(), memory, result);
if (!topic_value) {
return;
}
topic_names->elems.push_back(std::move(*topic_value));
}
procedure::MgpUniquePtr<mgp_value> topics_value{nullptr, mgp_value_destroy};
{
const auto success = TryOrSetError(
[&] {
return procedure::CreateMgpObject(topics_value, mgp_value_make_list, topic_names.release());
},
result);
if (!success) {
return;
}
}
if (!InsertResultOrSetError(result, record, topics_result_name.data(), topics_value.get())) {
return;
}
if (!InsertResultOrSetError(result, record, service_url_result_name.data(), service_url_value.get())) {
return;
}
},
[proc_name](auto && /*other*/) {
throw QueryRuntimeException("'{}' can be only used for Pulsar stream sources", proc_name);
}},
it->second);
};
mgp_proc proc(proc_name, get_stream_info, utils::NewDeleteResource(), false);
MG_ASSERT(mgp_proc_add_arg(&proc, "stream_name", procedure::Call<mgp_type *>(mgp_type_string)) ==
MGP_ERROR_NO_ERROR);
MG_ASSERT(mgp_proc_add_result(&proc, service_url_result_name.data(),
procedure::Call<mgp_type *>(mgp_type_string)) == MGP_ERROR_NO_ERROR);
MG_ASSERT(
mgp_proc_add_result(&proc, topics_result_name.data(),
procedure::Call<mgp_type *>(mgp_type_list, procedure::Call<mgp_type *>(mgp_type_string))) ==
MGP_ERROR_NO_ERROR);
procedure::gModuleRegistry.RegisterMgProcedure(proc_name, std::move(proc));
}
}
template <Stream TStream>

View File

@ -178,6 +178,10 @@ class Streams final {
}
}
void RegisterProcedures();
void RegisterKafkaProcedures();
void RegisterPulsarProcedures();
InterpreterContext *interpreter_context_;
kvstore::KVStore storage_;

View File

@ -124,11 +124,14 @@ def drop_stream(cursor, stream_name):
assert get_stream_info(cursor, stream_name) is None
def validate_info(actual_stream_info, expected_stream_info):
assert len(actual_stream_info) == len(expected_stream_info)
for info, expected_info in zip(actual_stream_info, expected_stream_info):
assert info == expected_info
def check_stream_info(cursor, stream_name, expected_stream_info):
stream_info = get_stream_info(cursor, stream_name)
assert len(stream_info) == len(expected_stream_info)
for info, expected_info in zip(stream_info, expected_stream_info):
assert info == expected_info
validate_info(stream_info, expected_stream_info)
def kafka_check_vertex_exists_with_topic_and_payload(cursor, topic, payload_bytes):
decoded_payload = payload_bytes.decode('utf-8')
@ -136,6 +139,8 @@ def kafka_check_vertex_exists_with_topic_and_payload(cursor, topic, payload_byte
cursor, {'topic': f'"{topic}"', 'payload': f'"{decoded_payload}"'})
PULSAR_SERVICE_URL = 'pulsar://127.0.0.1:6650'
def pulsar_default_namespace_topic(topic):
return f'persistent://public/default/{topic}'

View File

@ -16,7 +16,7 @@ from kafka.admin import KafkaAdminClient, NewTopic
import pulsar
import requests
from common import NAME, connect, execute_and_fetch_all
from common import NAME, connect, execute_and_fetch_all, PULSAR_SERVICE_URL
# To run these test locally a running Kafka sever is necessery. The test tries
# to connect on localhost:9092.
@ -73,7 +73,7 @@ def kafka_producer():
@pytest.fixture(scope="function")
def pulsar_client():
yield pulsar.Client('pulsar://127.0.0.1:6650')
yield pulsar.Client(PULSAR_SERVICE_URL)
@pytest.fixture(scope="function")

View File

@ -421,5 +421,24 @@ def test_set_offset(kafka_producer, kafka_topics, connection, transformation):
assert comparison_check("Final Message", res[0])
common.execute_and_fetch_all(cursor, "MATCH (n) DETACH DELETE n")
def test_info_procedure(kafka_topics, connection):
cursor = connection.cursor()
stream_name = 'test_stream'
local = "localhost:9092"
consumer_group = "ConsumerGr"
common.execute_and_fetch_all(
cursor,
f"CREATE KAFKA STREAM {stream_name} "
f"TOPICS {','.join(kafka_topics)} "
f"TRANSFORM pulsar_transform.simple "
f"CONSUMER_GROUP {consumer_group} "
f"BOOTSTRAP_SERVERS '{local}'"
)
stream_info = common.execute_and_fetch_all(cursor, f"CALL mg.kafka_stream_info('{stream_name}') YIELD *")
expected_stream_info = [(local, consumer_group, kafka_topics)]
common.validate_info(stream_info, expected_stream_info)
if __name__ == "__main__":
sys.exit(pytest.main([__file__, "-rA"]))

View File

@ -223,6 +223,21 @@ def test_check_stream(
cursor, pulsar_topics[0], message)
def test_info_procedure(pulsar_client, pulsar_topics, connection):
cursor = connection.cursor()
stream_name = 'test_stream'
common.execute_and_fetch_all(
cursor,
f"CREATE PULSAR STREAM {stream_name} "
f"TOPICS {','.join(pulsar_topics)} "
f"TRANSFORM pulsar_transform.simple ",
)
stream_info = common.execute_and_fetch_all(cursor, f"CALL mg.pulsar_stream_info('{stream_name}') YIELD *")
expected_stream_info = [(common.PULSAR_SERVICE_URL, pulsar_topics)]
common.validate_info(stream_info, expected_stream_info)
def test_show_streams(pulsar_client, pulsar_topics, connection):
assert len(pulsar_topics) > 1
cursor = connection.cursor()