Add kafka library and integrate it into memgraph

Summary:
Integrated kafka library into memgraph. This version supports all opencypher
features and will only output messages consumed from kafka.

Depends on D1434

Next steps are persisting stream metadata and transforming messages in order to
store them in the graph.

Reviewers: teon.banek, mtomic, mferencevic, buda

Reviewed By: teon.banek

Subscribers: mferencevic, pullbot, buda

Differential Revision: https://phabricator.memgraph.io/D1466
This commit is contained in:
Matija Santl 2018-06-19 14:37:02 +02:00
parent e2f9eb6fa5
commit fa7e214bcf
28 changed files with 923 additions and 110 deletions

View File

@ -35,6 +35,7 @@ BASE_FLAGS = [
'-I./libs/bzip2',
'-I./libs/zlib',
'-I./libs/rocksdb/include',
'-I./libs/librdkafka/include/librdkafka',
'-I./build/include'
]

View File

@ -154,6 +154,7 @@ include_directories(SYSTEM ${BZIP2_INCLUDE_DIR})
include_directories(SYSTEM ${ZLIB_INCLUDE_DIR})
include_directories(SYSTEM ${ROCKSDB_INCLUDE_DIR})
include_directories(SYSTEM ${CAPNP_INCLUDE_DIR})
include_directories(SYSTEM ${LIBRDKAFKA_INCLUDE_DIR})
# -----------------------------------------------------------------------------
# openCypher parser -----------------------------------------------------------

View File

@ -20,15 +20,29 @@ The full openCypher clause for creating a stream is:
```opencypher
CREATE STREAM stream_name AS
LOAD DATA KAFKA 'URI'
WITH TOPIC 'topic'
WITH TRANSFORM 'URI'
[BATCH INTERVAL milliseconds]
[BATCH_INTERVAL milliseconds]
[BATCH_SIZE count]
```
The `WITH TOPIC` parameter specifies the kafka topic from which we'll stream
data.
The `WITH TRANSFORM` parameter should contain a URI of the transform script.
The `BATCH_INTERVAL` parameter defines the time interval in milliseconds
that defines the time between two successive stream importing operations.
The `BATCH_SIZE` parameter defines the count of kafka messages that will be
batched together before import.
If both `BATCH_INTERVAL` and `BATCH_SIZE` parameters are given, the condition
that is satisfied first will trigger the batched import.
Default values for `BATCH_INTERVAL` is 100 milliseconds, and the default value
for `BATCH_SIZE` is 10;
The `DROP` clause deletes a stream:
```opencypher
DROP STREAM stream_name;

View File

@ -233,3 +233,19 @@ set(CAPNP_EXE ${CMAKE_CURRENT_SOURCE_DIR}/capnproto/local/bin/capnp
set(CAPNP_CXX_EXE ${CMAKE_CURRENT_SOURCE_DIR}/capnproto/local/bin/capnpc-c++
CACHE FILEPATH "Path to capnproto c++ plugin executable" FORCE)
mark_as_advanced(CAPNP_INCLUDE_DIR CAPNP_LIBRARY KJ_LIBRARY CAPNP_EXE CAPNP_CXX_EXE)
# Setup librdkafka.
import_external_library(librdkafka STATIC
${CMAKE_CURRENT_SOURCE_DIR}/librdkafka/lib/librdkafka.a
${CMAKE_CURRENT_SOURCE_DIR}/librdkafka/include/librdkafka
CMAKE_ARGS -DRDKAFKA_BUILD_STATIC=ON
-DRDKAFKA_BUILD_EXAMPLES=OFF
-DRDKAFKA_BUILD_TESTS=OFF
-DCMAKE_INSTALL_LIBDIR=lib
-DWITH_SSL=ON
# If we want SASL, we need to install it on build machines
-DWITH_SASL=OFF)
import_library(librdkafka++ STATIC
${CMAKE_CURRENT_SOURCE_DIR}/librdkafka/lib/librdkafka++.a
librdkafka-proj)

View File

@ -127,3 +127,8 @@ tar -xzf capnproto.tar.gz
rm -rf capnproto
mv capnproto-c++-0.6.1 capnproto
rm capnproto.tar.gz
# kafka
kafka_tag="c319b4e987d0bc4fe4f01cf91419d90b62061655" # Mar 8, 2018
# git clone https://github.com/edenhill/librdkafka.git
clone git://deps.memgraph.io/librdkafka.git librdkafka $kafka_tag

View File

@ -1,7 +1,8 @@
# CMake configuration for the main memgraph library and executable
# add memgraph sub libraries
# add memgraph sub libraries, ordered by dependency
add_subdirectory(utils)
add_subdirectory(integrations)
add_subdirectory(io)
add_subdirectory(telemetry)
@ -196,7 +197,7 @@ set(MEMGRAPH_ALL_LIBS stdc++fs Threads::Threads fmt cppitertools
${OPENSSL_LIBRARIES}
${Boost_IOSTREAMS_LIBRARY_RELEASE}
${Boost_SERIALIZATION_LIBRARY_RELEASE}
mg-utils mg-io)
mg-utils mg-io mg-integrations)
if (USE_LTALLOC)
list(APPEND MEMGRAPH_ALL_LIBS ltalloc)
@ -218,7 +219,7 @@ add_dependencies(memgraph_lib generate_capnp)
# STATIC library used to store key-value pairs
add_library(kvstore_lib STATIC storage/kvstore.cpp)
target_link_libraries(kvstore_lib stdc++fs mg-utils rocksdb bzip2 zlib)
target_link_libraries(kvstore_lib stdc++fs mg-utils rocksdb bzip2 zlib glog gflags)
# STATIC library for dummy key-value storage
add_library(kvstore_dummy_lib STATIC storage/kvstore_dummy.cpp)

View File

@ -56,6 +56,9 @@ class PrivateBase : public GraphDb {
Storage &storage() override { return *storage_; }
durability::WriteAheadLog &wal() override { return wal_; }
integrations::kafka::Streams &kafka_streams() override {
return kafka_streams_;
}
int WorkerId() const override { return config_.worker_id; }
// Makes a local snapshot from the visibility of accessor
@ -98,6 +101,7 @@ class PrivateBase : public GraphDb {
durability::WriteAheadLog wal_{config_.worker_id,
config_.durability_directory,
config_.durability_enabled};
integrations::kafka::Streams kafka_streams_;
};
template <template <typename TId> class TMapper>
@ -445,6 +449,9 @@ PublicBase::~PublicBase() {
GraphDb::Type PublicBase::type() const { return impl_->type(); }
Storage &PublicBase::storage() { return impl_->storage(); }
durability::WriteAheadLog &PublicBase::wal() { return impl_->wal(); }
integrations::kafka::Streams &PublicBase::kafka_streams() {
return impl_->kafka_streams();
}
tx::Engine &PublicBase::tx_engine() { return impl_->tx_engine(); }
ConcurrentIdMapper<Label> &PublicBase::label_mapper() {
return impl_->label_mapper();

View File

@ -8,6 +8,7 @@
#include "database/storage.hpp"
#include "database/storage_gc.hpp"
#include "durability/wal.hpp"
#include "integrations/kafka/streams.hpp"
#include "io/network/endpoint.hpp"
#include "storage/concurrent_id_mapper.hpp"
#include "storage/types.hpp"
@ -92,6 +93,7 @@ class GraphDb {
virtual Type type() const = 0;
virtual Storage &storage() = 0;
virtual durability::WriteAheadLog &wal() = 0;
virtual integrations::kafka::Streams &kafka_streams() = 0;
virtual tx::Engine &tx_engine() = 0;
virtual storage::ConcurrentIdMapper<storage::Label> &label_mapper() = 0;
virtual storage::ConcurrentIdMapper<storage::EdgeType>
@ -149,6 +151,7 @@ class PublicBase : public GraphDb {
Type type() const override;
Storage &storage() override;
durability::WriteAheadLog &wal() override;
integrations::kafka::Streams &kafka_streams() override;
tx::Engine &tx_engine() override;
storage::ConcurrentIdMapper<storage::Label> &label_mapper() override;
storage::ConcurrentIdMapper<storage::EdgeType> &edge_type_mapper() override;

View File

@ -0,0 +1,7 @@
set(integrations_src_files
kafka/streams.cpp
kafka/consumer.cpp)
add_library(mg-integrations STATIC ${integrations_src_files})
target_link_libraries(mg-integrations Threads::Threads fmt glog gflags
librdkafka++ librdkafka zlib)

View File

@ -0,0 +1,202 @@
#include "integrations/kafka/consumer.hpp"
#include <chrono>
#include "glog/logging.h"
#include "integrations/kafka/exceptions.hpp"
namespace integrations {
namespace kafka {
using namespace std::chrono_literals;
constexpr int64_t kDefaultBatchIntervalMillis = 100;
constexpr int64_t kDefaultBatchSize = 10;
void Consumer::event_cb(RdKafka::Event &event) {
switch (event.type()) {
case RdKafka::Event::Type::EVENT_ERROR:
LOG(WARNING) << "[Kafka] stream " << info_.stream_name << " ERROR ("
<< RdKafka::err2str(event.err()) << "): " << event.str();
break;
default:
break;
}
}
Consumer::Consumer(const StreamInfo &info) : info_(info) {
std::unique_ptr<RdKafka::Conf> conf(
RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL));
std::string error;
if (conf->set("event_cb", this, error) != RdKafka::Conf::CONF_OK) {
throw ConsumerFailedToInitializeException(info_.stream_name, error);
}
if (conf->set("enable.partition.eof", "false", error) !=
RdKafka::Conf::CONF_OK) {
throw ConsumerFailedToInitializeException(info_.stream_name, error);
}
if (conf->set("bootstrap.servers", info_.stream_uri, error) !=
RdKafka::Conf::CONF_OK) {
throw ConsumerFailedToInitializeException(info_.stream_name, error);
}
if (conf->set("group.id", "mg", error) != RdKafka::Conf::CONF_OK) {
throw ConsumerFailedToInitializeException(info_.stream_name, error);
}
consumer_ = std::unique_ptr<RdKafka::KafkaConsumer,
std::function<void(RdKafka::KafkaConsumer *)>>(
RdKafka::KafkaConsumer::create(conf.get(), error),
[this](auto *consumer) {
this->StopConsuming();
consumer->close();
delete consumer;
});
if (!consumer_) {
throw ConsumerFailedToInitializeException(info_.stream_name, error);
}
// Try fetching metadata first and check if topic exists.
RdKafka::ErrorCode err;
RdKafka::Metadata *raw_metadata = nullptr;
err = consumer_->metadata(true, nullptr, &raw_metadata, 1000);
std::unique_ptr<RdKafka::Metadata> metadata(raw_metadata);
if (err != RdKafka::ERR_NO_ERROR) {
throw ConsumerFailedToInitializeException(info_.stream_name,
RdKafka::err2str(err));
}
bool topic_found = false;
for (const auto &topic_metadata : *metadata->topics()) {
if (topic_metadata->topic() == info_.stream_topic) {
topic_found = true;
break;
}
}
if (!topic_found) {
throw TopicNotFoundException(info_.stream_name);
}
err = consumer_->subscribe({info_.stream_topic});
if (err != RdKafka::ERR_NO_ERROR) {
throw ConsumerFailedToInitializeException(info_.stream_name,
RdKafka::err2str(err));
}
}
void Consumer::StopConsuming() {
is_running_.store(false);
if (thread_.joinable()) thread_.join();
}
void Consumer::StartConsuming(
std::experimental::optional<int64_t> batch_limit) {
thread_ = std::thread([this, batch_limit]() {
int64_t batch_count = 0;
is_running_.store(true);
while (is_running_) {
int64_t remaining_timeout_in_ms =
info_.batch_interval_in_ms.value_or(kDefaultBatchIntervalMillis);
int64_t remaining_size = info_.batch_size.value_or(kDefaultBatchSize);
auto start = std::chrono::system_clock::now();
bool run_batch = true;
while (is_running_ && run_batch && remaining_size-- > 0) {
std::unique_ptr<RdKafka::Message> msg(
consumer_->consume(remaining_timeout_in_ms));
switch (msg->err()) {
case RdKafka::ERR__TIMED_OUT:
run_batch = false;
break;
case RdKafka::ERR_NO_ERROR:
// TODO (msantl): store message to current batch and pass the batch
// to transform
break;
default:
LOG(ERROR) << "Consumer error: " << msg->errstr();
is_running_.store(false);
break;
}
auto now = std::chrono::system_clock::now();
auto took =
std::chrono::duration_cast<std::chrono::milliseconds>(now - start);
if (took.count() >= remaining_timeout_in_ms) {
break;
}
remaining_timeout_in_ms = remaining_timeout_in_ms - took.count();
start = now;
}
if (batch_limit != std::experimental::nullopt) {
batch_count++;
if (batch_limit <= batch_count) {
is_running_.store(false);
}
}
}
});
}
void Consumer::Start(std::experimental::optional<int64_t> batch_limit) {
if (!consumer_) {
throw ConsumerNotAvailableException(info_.stream_name);
}
if (is_running_) {
throw ConsumerRunningException(info_.stream_name);
}
StartConsuming(batch_limit);
}
void Consumer::Stop() {
if (!consumer_) {
throw ConsumerNotAvailableException(info_.stream_name);
}
if (!is_running_) {
throw ConsumerStoppedException(info_.stream_name);
}
StopConsuming();
}
void Consumer::StartIfNotStopped() {
if (!consumer_) {
throw ConsumerNotAvailableException(info_.stream_name);
}
if (!is_running_) {
StartConsuming(std::experimental::nullopt);
}
}
void Consumer::StopIfNotRunning() {
if (!consumer_) {
throw ConsumerNotAvailableException(info_.stream_name);
}
if (is_running_) {
StopConsuming();
}
}
StreamInfo Consumer::info() {
info_.is_running = is_running_;
return info_;
}
} // namespace kafka
} // namespace integrations

View File

@ -0,0 +1,66 @@
#pragma once
#include <atomic>
#include <experimental/optional>
#include <memory>
#include <string>
#include <thread>
#include <vector>
#include "rdkafkacpp.h"
namespace integrations {
namespace kafka {
struct StreamInfo {
std::string stream_name;
std::string stream_uri;
std::string stream_topic;
std::string transform_uri;
std::experimental::optional<int64_t> batch_interval_in_ms;
std::experimental::optional<int64_t> batch_size;
bool is_running = false;
};
class Consumer final : public RdKafka::EventCb {
public:
Consumer() = delete;
explicit Consumer(const StreamInfo &info);
Consumer(const Consumer &other) = delete;
Consumer(Consumer &&other) = delete;
Consumer &operator=(const Consumer &other) = delete;
Consumer &operator=(Consumer &&other) = delete;
void Start(std::experimental::optional<int64_t> batch_limit);
void Stop();
void StartIfNotStopped();
void StopIfNotRunning();
StreamInfo info();
private:
void event_cb(RdKafka::Event &event) override;
StreamInfo info_;
std::atomic<bool> is_running_{false};
std::thread thread_;
std::unique_ptr<RdKafka::KafkaConsumer,
std::function<void(RdKafka::KafkaConsumer *)>>
consumer_;
void StopConsuming();
void StartConsuming(std::experimental::optional<int64_t> batch_limit);
};
} // namespace kafka
} // namespace integrations

View File

@ -0,0 +1,59 @@
#pragma once
#include "utils/exceptions.hpp"
#include <fmt/format.h>
class KafkaStreamException : public utils::BasicException {
using utils::BasicException::BasicException;
};
class StreamExistsException : public KafkaStreamException {
public:
explicit StreamExistsException(const std::string &stream_name)
: KafkaStreamException(
fmt::format("Kafka stream {} already exists.", stream_name)) {}
};
class StreamDoesntExistException : public KafkaStreamException {
public:
explicit StreamDoesntExistException(const std::string &stream_name)
: KafkaStreamException(
fmt::format("Kafka stream {} doesn't exist.", stream_name)) {}
};
class ConsumerFailedToInitializeException : public KafkaStreamException {
public:
ConsumerFailedToInitializeException(const std::string &stream_name,
const std::string &error)
: KafkaStreamException(fmt::format(
"Failed to initialize kafka stream {} : {}", stream_name, error)) {}
};
class ConsumerNotAvailableException : public KafkaStreamException {
public:
ConsumerNotAvailableException(const std::string &stream_name)
: KafkaStreamException(
fmt::format("Kafka stream {} not available", stream_name)) {}
};
class ConsumerRunningException : public KafkaStreamException {
public:
ConsumerRunningException(const std::string &stream_name)
: KafkaStreamException(
fmt::format("Kafka stream {} is already running", stream_name)) {}
};
class ConsumerStoppedException : public KafkaStreamException {
public:
ConsumerStoppedException(const std::string &stream_name)
: KafkaStreamException(
fmt::format("Kafka stream {} is already stopped", stream_name)) {}
};
class TopicNotFoundException : public KafkaStreamException {
public:
TopicNotFoundException(const std::string &stream_name)
: KafkaStreamException(
fmt::format("Kafka stream {}, topic not found", stream_name)) {}
};

View File

@ -0,0 +1,68 @@
#include "integrations/kafka/streams.hpp"
#include "integrations/kafka/exceptions.hpp"
namespace integrations {
namespace kafka {
void Streams::CreateStream(const StreamInfo &info) {
std::lock_guard<std::mutex> g(mutex_);
if (consumers_.find(info.stream_name) != consumers_.end())
throw StreamExistsException(info.stream_name);
consumers_.emplace(info.stream_name, info);
}
void Streams::DropStream(const std::string &stream_name) {
std::lock_guard<std::mutex> g(mutex_);
auto find_it = consumers_.find(stream_name);
if (find_it == consumers_.end())
throw StreamDoesntExistException(stream_name);
consumers_.erase(find_it);
}
void Streams::StartStream(const std::string &stream_name,
std::experimental::optional<int64_t> batch_limit) {
std::lock_guard<std::mutex> g(mutex_);
auto find_it = consumers_.find(stream_name);
if (find_it == consumers_.end())
throw StreamDoesntExistException(stream_name);
find_it->second.Start(batch_limit);
}
void Streams::StopStream(const std::string &stream_name) {
std::lock_guard<std::mutex> g(mutex_);
auto find_it = consumers_.find(stream_name);
if (find_it == consumers_.end())
throw StreamDoesntExistException(stream_name);
find_it->second.Stop();
}
void Streams::StartAllStreams() {
std::lock_guard<std::mutex> g(mutex_);
for (auto &consumer_kv : consumers_) {
consumer_kv.second.StartIfNotStopped();
}
}
void Streams::StopAllStreams() {
std::lock_guard<std::mutex> g(mutex_);
for (auto &consumer_kv : consumers_) {
consumer_kv.second.StopIfNotRunning();
}
}
std::vector<StreamInfo> Streams::ShowStreams() {
std::vector<StreamInfo> streams;
std::lock_guard<std::mutex> g(mutex_);
for (auto &consumer_kv : consumers_) {
streams.emplace_back(consumer_kv.second.info());
}
return streams;
}
} // namespace kafka
} // namespace integrations

View File

@ -0,0 +1,38 @@
#pragma once
#include "integrations/kafka/consumer.hpp"
#include <experimental/optional>
#include <mutex>
#include <unordered_map>
namespace integrations {
namespace kafka {
class Streams final {
public:
void CreateStream(const StreamInfo &info);
void DropStream(const std::string &stream_name);
void StartStream(const std::string &stream_name,
std::experimental::optional<int64_t> batch_limit =
std::experimental::nullopt);
void StopStream(const std::string &stream_name);
void StartAllStreams();
void StopAllStreams();
std::vector<StreamInfo> ShowStreams();
private:
std::mutex mutex_;
std::unordered_map<std::string, Consumer> consumers_;
// TODO (msantl): persist stream storage
};
} // namespace kafka
} // namespace integrations

View File

@ -410,8 +410,10 @@ struct DropUser {
struct CreateStream {
streamName @0 :Text;
streamUri @1 :Tree;
transformUri @2 :Tree;
batchInterval @3 :Tree;
streamTopic @2 :Tree;
transformUri @3 :Tree;
batchIntervalInMs @4 :Tree;
batchSize @5 :Tree;
}
struct DropStream {

View File

@ -1390,12 +1390,20 @@ void CreateStream::Save(capnp::CreateStream::Builder *builder,
auto stream_uri_builder = builder->getStreamUri();
stream_uri_->Save(&stream_uri_builder, saved_uids);
auto stream_topic_builder = builder->getStreamTopic();
stream_topic_->Save(&stream_topic_builder, saved_uids);
auto transform_uri_builder = builder->getTransformUri();
transform_uri_->Save(&transform_uri_builder, saved_uids);
if (batch_interval_) {
auto batch_interval_builder = builder->getBatchInterval();
batch_interval_->Save(&batch_interval_builder, saved_uids);
if (batch_interval_in_ms_) {
auto batch_interval_builder = builder->getBatchIntervalInMs();
batch_interval_in_ms_->Save(&batch_interval_builder, saved_uids);
}
if (batch_size_) {
auto batch_size_builder = builder->getBatchSize();
batch_size_->Save(&batch_size_builder, saved_uids);
}
}
@ -1408,16 +1416,28 @@ void CreateStream::Load(const capnp::Tree::Reader &base_reader,
const auto stream_uri_reader = reader.getStreamUri();
stream_uri_ =
dynamic_cast<Expression *>(storage->Load(stream_uri_reader, loaded_uids));
const auto stream_topic_reader = reader.getStreamTopic();
stream_topic_ = dynamic_cast<Expression *>(
storage->Load(stream_topic_reader, loaded_uids));
const auto transform_uri_reader = reader.getTransformUri();
transform_uri_ = dynamic_cast<Expression *>(
storage->Load(transform_uri_reader, loaded_uids));
batch_interval_ = nullptr;
if (reader.hasBatchInterval()) {
const auto batch_interval_reader = reader.getBatchInterval();
batch_interval_ = dynamic_cast<Expression *>(
batch_interval_in_ms_ = nullptr;
if (reader.hasBatchIntervalInMs()) {
const auto batch_interval_reader = reader.getBatchIntervalInMs();
batch_interval_in_ms_ = dynamic_cast<Expression *>(
storage->Load(batch_interval_reader, loaded_uids));
}
batch_size_ = nullptr;
if (reader.hasBatchSize()) {
const auto batch_size_reader = reader.getBatchSize();
batch_size_ = dynamic_cast<Expression *>(
storage->Load(batch_size_reader, loaded_uids));
}
}
CreateStream *CreateStream::Construct(const capnp::CreateStream::Reader &reader,

View File

@ -42,14 +42,14 @@ namespace query {
#define CLONE_BINARY_EXPRESSION \
auto Clone(AstStorage &storage) const->std::remove_const< \
std::remove_pointer<decltype(this)>::type>::type *override { \
std::remove_pointer<decltype(this)>::type>::type * override { \
return storage.Create< \
std::remove_cv<std::remove_reference<decltype(*this)>::type>::type>( \
expression1_->Clone(storage), expression2_->Clone(storage)); \
}
#define CLONE_UNARY_EXPRESSION \
auto Clone(AstStorage &storage) const->std::remove_const< \
std::remove_pointer<decltype(this)>::type>::type *override { \
std::remove_pointer<decltype(this)>::type>::type * override { \
return storage.Create< \
std::remove_cv<std::remove_reference<decltype(*this)>::type>::type>( \
expression_->Clone(storage)); \
@ -801,8 +801,8 @@ class SubscriptOperator : public BinaryOperator {
return visitor.PostVisit(*this);
}
CLONE_BINARY_EXPRESSION;
static SubscriptOperator *Construct(
capnp::SubscriptOperator::Reader &reader, AstStorage *storage);
static SubscriptOperator *Construct(capnp::SubscriptOperator::Reader &reader,
AstStorage *storage);
protected:
using BinaryOperator::BinaryOperator;
@ -813,8 +813,9 @@ class SubscriptOperator : public BinaryOperator {
friend class boost::serialization::access;
SERIALIZE_USING_BASE(BinaryOperator);
template <class TArchive>
friend void boost::serialization::load_construct_data(
TArchive &, SubscriptOperator *, const unsigned int);
friend void boost::serialization::load_construct_data(TArchive &,
SubscriptOperator *,
const unsigned int);
};
class ListSlicingOperator : public Expression {
@ -3673,8 +3674,9 @@ class CreateStream : public Clause {
CreateStream *Clone(AstStorage &storage) const override {
return storage.Create<CreateStream>(
stream_name_, stream_uri_->Clone(storage),
transform_uri_->Clone(storage),
batch_interval_ ? batch_interval_->Clone(storage) : nullptr);
stream_topic_->Clone(storage), transform_uri_->Clone(storage),
batch_interval_in_ms_ ? batch_interval_in_ms_->Clone(storage) : nullptr,
batch_size_ ? batch_size_->Clone(storage) : nullptr);
}
static CreateStream *Construct(const capnp::CreateStream::Reader &reader,
@ -3683,18 +3685,23 @@ class CreateStream : public Clause {
std::string stream_name_;
Expression *stream_uri_;
Expression *stream_topic_;
Expression *transform_uri_;
Expression *batch_interval_;
Expression *batch_interval_in_ms_;
Expression *batch_size_;
protected:
explicit CreateStream(int uid) : Clause(uid) {}
CreateStream(int uid, std::string stream_name, Expression *stream_uri,
Expression *transform_uri, Expression *batch_interval)
Expression *stream_topic, Expression *transform_uri,
Expression *batch_interval_in_ms, Expression *batch_size)
: Clause(uid),
stream_name_(std::move(stream_name)),
stream_uri_(stream_uri),
stream_topic_(stream_topic),
transform_uri_(transform_uri),
batch_interval_(batch_interval) {}
batch_interval_in_ms_(batch_interval_in_ms),
batch_size_(batch_size) {}
void Save(capnp::Clause::Builder *builder,
std::vector<int> *saved_uids) override;
@ -3711,8 +3718,10 @@ class CreateStream : public Clause {
ar &boost::serialization::base_object<Clause>(*this);
ar &stream_name_;
ar &stream_uri_;
ar &stream_topic_;
ar &transform_uri_;
ar &batch_interval_;
ar &batch_interval_in_ms_;
ar &batch_size_;
}
template <class TArchive>

View File

@ -324,17 +324,30 @@ antlrcpp::Any CypherMainVisitor::visitCreateStream(
throw SyntaxException("Stream URI should be a string literal!");
}
Expression *stream_uri = ctx->streamUri->accept(this);
if (!ctx->streamTopic->StringLiteral()) {
throw SyntaxException("Topic should be a string literal!");
}
Expression *stream_topic = ctx->streamTopic->accept(this);
if (!ctx->transformUri->StringLiteral()) {
throw SyntaxException("Transform URI should be a string literal!");
}
Expression *transform_uri = ctx->transformUri->accept(this);
Expression *batch_interval = nullptr;
Expression *batch_interval_in_ms = nullptr;
if (ctx->batchIntervalOption()) {
batch_interval = ctx->batchIntervalOption()->accept(this);
batch_interval_in_ms = ctx->batchIntervalOption()->accept(this);
}
return storage_.Create<CreateStream>(stream_name, stream_uri, transform_uri,
batch_interval);
Expression *batch_size = nullptr;
if (ctx->batchSizeOption()) {
batch_size = ctx->batchSizeOption()->accept(this);
}
return storage_.Create<CreateStream>(stream_name, stream_uri, stream_topic,
transform_uri, batch_interval_in_ms,
batch_size);
}
/**
@ -349,6 +362,18 @@ antlrcpp::Any CypherMainVisitor::visitBatchIntervalOption(
return ctx->literal()->accept(this);
}
/**
* @return Expression*
*/
antlrcpp::Any CypherMainVisitor::visitBatchSizeOption(
CypherParser::BatchSizeOptionContext *ctx) {
if (!ctx->literal()->numberLiteral() ||
!ctx->literal()->numberLiteral()->integerLiteral()) {
throw SyntaxException("Batch size should be an integer literal!");
}
return ctx->literal()->accept(this);
}
/**
* @return DropStream*
*/

View File

@ -195,6 +195,9 @@ class CypherMainVisitor : public antlropencypher::CypherBaseVisitor {
antlrcpp::Any visitBatchIntervalOption(
CypherParser::BatchIntervalOptionContext *ctx) override;
antlrcpp::Any visitBatchSizeOption(
CypherParser::BatchSizeOptionContext *ctx) override;
/**
* @return DropStream*
*/

View File

@ -300,9 +300,13 @@ dropUser : DROP SP USER SP userName ( SP? ',' SP? userName )* ;
streamName : UnescapedSymbolicName ;
createStream : CREATE SP STREAM SP streamName SP AS SP LOAD SP DATA SP KAFKA SP streamUri=literal SP WITH SP TRANSFORM SP transformUri=literal ( SP batchIntervalOption )? ;
createStream : CREATE SP STREAM SP streamName SP AS SP LOAD SP DATA SP KAFKA SP
streamUri=literal SP WITH SP TOPIC SP streamTopic=literal SP WITH SP TRANSFORM
SP transformUri=literal ( SP batchIntervalOption )? (SP batchSizeOption )? ;
batchIntervalOption : BATCH SP INTERVAL SP literal ;
batchIntervalOption : BATCH_INTERVAL SP literal ;
batchSizeOption : BATCH_SIZE SP literal ;
dropStream : DROP SP STREAM SP streamName ;
@ -427,8 +431,8 @@ symbolicName : UnescapedSymbolicName
| DATA
| KAFKA
| TRANSFORM
| BATCH
| INTERVAL
| BATCH_SIZE
| BATCH_INTERVAL
| SHOW
| START
| STOP
@ -559,9 +563,9 @@ KAFKA : ( 'K' | 'k' ) ( 'A' | 'a' ) ( 'F' | 'f' ) ( 'K' | 'k' ) ( 'A' | 'a' ) ;
TRANSFORM : ( 'T' | 't' ) ( 'R' | 'r' ) ( 'A' | 'a' ) ( 'N' | 'n' ) ( 'S' | 's') ( 'F' | 'f' ) ( 'O' | 'o' ) ( 'R' | 'r' ) ( 'M' | 'm' ) ;
BATCH : ( 'B' | 'b' ) ( 'A' | 'a' ) ( 'T' | 't' ) ( 'C' | 'c' ) ( 'H' | 'h' ) ;
BATCH_SIZE : ( 'B' | 'b' ) ( 'A' | 'a' ) ( 'T' | 't' ) ( 'C' | 'c' ) ( 'H' | 'h') '_' ( 'S' | 's' ) ( 'I' | 'i' ) ( 'Z' | 'z' ) ( 'E' | 'e' ) ;
INTERVAL : ( 'I' | 'i' ) ( 'N' | 'n' ) ( 'T' | 't' ) ( 'E' | 'e' ) ( 'R' | 'r' ) ( 'V' | 'v' ) ( 'A' | 'a' ) ( 'L' | 'l' ) ;
BATCH_INTERVAL : ( 'B' | 'b' ) ( 'A' | 'a' ) ( 'T' | 't' ) ( 'C' | 'c' ) ( 'H' | 'h' ) '_' ( 'I' | 'i' ) ( 'N' | 'n' ) ( 'T' | 't' ) ( 'E' | 'e' ) ( 'R' | 'r' ) ( 'V' | 'v' ) ( 'A' | 'a' ) ( 'L' | 'l' ) ;
SHOW : ( 'S' | 's' ) ( 'H' | 'h' ) ( 'O' | 'o' ) ( 'W' | 'w' ) ;
@ -571,6 +575,8 @@ STOP : ( 'S' | 's' ) ( 'T' | 't' ) ( 'O' | 'o' ) ( 'P' | 'p' ) ;
BATCHES : ( 'B' | 'b' ) ( 'A' | 'a' ) ( 'T' | 't' ) ( 'C' | 'c' ) ( 'H' | 'h' ) ( 'E' | 'e' ) ( 'S' | 's' ) ;
TOPIC : ( 'T' | 't' ) ( 'O' | 'o' ) ( 'P' | 'p' ) ( 'I' | 'i' ) ( 'C' | 'c' ) ;
UnescapedSymbolicName : IdentifierStart ( IdentifierPart )* ;
/**

View File

@ -89,7 +89,7 @@ const trie::Trie kKeywords = {
"extract", "any", "none", "single", "true", "false",
"reduce", "user", "password", "alter", "drop", "stream",
"streams", "load", "data", "kafka", "transform", "batch",
"interval", "show", "start", "stop"};
"interval", "show", "start", "stop", "size", "topic"};
// Unicode codepoints that are allowed at the start of the unescaped name.
const std::bitset<kBitsetSize> kUnescapedNameAllowedStarts(std::string(

View File

@ -21,6 +21,7 @@
#include "distributed/pull_rpc_clients.hpp"
#include "distributed/updates_rpc_clients.hpp"
#include "distributed/updates_rpc_server.hpp"
#include "integrations/kafka/exceptions.hpp"
#include "query/context.hpp"
#include "query/exceptions.hpp"
#include "query/frontend/ast/ast.hpp"
@ -3871,28 +3872,67 @@ std::unique_ptr<Cursor> DropUser::MakeCursor(
}
CreateStream::CreateStream(std::string stream_name, Expression *stream_uri,
Expression *transform_uri,
Expression *batch_interval)
Expression *stream_topic, Expression *transform_uri,
Expression *batch_interval_in_ms,
Expression *batch_size)
: stream_name_(std::move(stream_name)),
stream_uri_(stream_uri),
stream_topic_(stream_topic),
transform_uri_(transform_uri),
batch_interval_(batch_interval) {}
batch_interval_in_ms_(batch_interval_in_ms),
batch_size_(batch_size) {}
WITHOUT_SINGLE_INPUT(CreateStream)
class CreateStreamCursor : public Cursor {
using StreamInfo = integrations::kafka::StreamInfo;
public:
CreateStreamCursor(const CreateStream &, database::GraphDbAccessor &) {}
CreateStreamCursor(const CreateStream &self, database::GraphDbAccessor &db)
: self_(self), db_(db) {}
bool Pull(Frame &frame, Context &ctx) override {
if (ctx.in_explicit_transaction_) {
throw StreamClauseInMulticommandTxException();
}
ExpressionEvaluator evaluator(frame, &ctx, GraphView::OLD);
throw utils::NotYetImplemented("Create Stream");
TypedValue stream_uri = self_.stream_uri()->Accept(evaluator);
TypedValue stream_topic = self_.stream_topic()->Accept(evaluator);
TypedValue transform_uri = self_.transform_uri()->Accept(evaluator);
std::experimental::optional<int64_t> batch_interval_in_ms, batch_size;
if (self_.batch_interval_in_ms()) {
batch_interval_in_ms =
self_.batch_interval_in_ms()->Accept(evaluator).Value<int64_t>();
}
if (self_.batch_size()) {
batch_size = self_.batch_size()->Accept(evaluator).Value<int64_t>();
}
try {
StreamInfo info;
info.stream_name = self_.stream_name();
info.stream_uri = stream_uri.Value<std::string>();
info.stream_topic = stream_topic.Value<std::string>();
info.transform_uri = transform_uri.Value<std::string>();
info.batch_interval_in_ms = batch_interval_in_ms;
info.batch_size = batch_size;
db_.db().kafka_streams().CreateStream(info);
} catch (const KafkaStreamException &e) {
throw QueryRuntimeException(e.what());
}
return false;
}
void Reset() override { throw utils::NotYetImplemented("Create Stream"); }
private:
const CreateStream &self_;
database::GraphDbAccessor &db_;
};
std::unique_ptr<Cursor> CreateStream::MakeCursor(
@ -3907,17 +3947,27 @@ WITHOUT_SINGLE_INPUT(DropStream)
class DropStreamCursor : public Cursor {
public:
DropStreamCursor(const DropStream &, database::GraphDbAccessor &) {}
DropStreamCursor(const DropStream &self, database::GraphDbAccessor &db)
: self_(self), db_(db) {}
bool Pull(Frame &frame, Context &ctx) override {
if (ctx.in_explicit_transaction_) {
throw StreamClauseInMulticommandTxException();
}
throw utils::NotYetImplemented("Drop Stream");
try {
db_.db().kafka_streams().DropStream(self_.stream_name());
} catch (const KafkaStreamException &e) {
throw QueryRuntimeException(e.what());
}
return false;
}
void Reset() override { throw utils::NotYetImplemented("Drop Stream"); }
private:
const DropStream &self_;
database::GraphDbAccessor &db_;
};
std::unique_ptr<Cursor> DropStream::MakeCursor(
@ -3925,21 +3975,61 @@ std::unique_ptr<Cursor> DropStream::MakeCursor(
return std::make_unique<DropStreamCursor>(*this, db);
}
ShowStreams::ShowStreams(Symbol name_symbol, Symbol uri_symbol,
Symbol topic_symbol, Symbol transform_symbol,
Symbol status_symbol)
: name_symbol_(name_symbol),
uri_symbol_(uri_symbol),
topic_symbol_(topic_symbol),
transform_symbol_(transform_symbol),
status_symbol_(status_symbol) {}
WITHOUT_SINGLE_INPUT(ShowStreams)
std::vector<Symbol> ShowStreams::OutputSymbols(const SymbolTable &) const {
return {name_symbol_, uri_symbol_, topic_symbol_, transform_symbol_,
status_symbol_};
}
class ShowStreamsCursor : public Cursor {
public:
ShowStreamsCursor(const ShowStreams &, database::GraphDbAccessor &) {}
ShowStreamsCursor(const ShowStreams &self, database::GraphDbAccessor &db)
: self_(self), db_(db) {}
bool Pull(Frame &frame, Context &ctx) override {
if (ctx.in_explicit_transaction_) {
throw StreamClauseInMulticommandTxException();
}
throw utils::NotYetImplemented("Show Streams");
if (!is_initialized_) {
streams_ = db_.db().kafka_streams().ShowStreams();
streams_it_ = streams_.begin();
is_initialized_ = true;
}
if (streams_it_ == streams_.end()) return false;
frame[self_.name_symbol()] = streams_it_->stream_name;
frame[self_.uri_symbol()] = streams_it_->stream_uri;
frame[self_.topic_symbol()] = streams_it_->stream_topic;
frame[self_.transform_symbol()] = streams_it_->transform_uri;
frame[self_.status_symbol()] = streams_it_->is_running;
streams_it_++;
return true;
}
void Reset() override { throw utils::NotYetImplemented("Show Streams"); }
private:
const ShowStreams &self_;
database::GraphDbAccessor &db_;
bool is_initialized_ = false;
using StreamInfo = integrations::kafka::StreamInfo;
std::vector<StreamInfo> streams_;
std::vector<StreamInfo>::iterator streams_it_ = streams_.begin();
};
std::unique_ptr<Cursor> ShowStreams::MakeCursor(
@ -3957,18 +4047,33 @@ WITHOUT_SINGLE_INPUT(StartStopStream)
class StartStopStreamCursor : public Cursor {
public:
StartStopStreamCursor(const StartStopStream &,
database::GraphDbAccessor &db) {}
StartStopStreamCursor(const StartStopStream &self,
database::GraphDbAccessor &db)
: self_(self), db_(db) {}
bool Pull(Frame &frame, Context &ctx) override {
if (ctx.in_explicit_transaction_) {
throw StreamClauseInMulticommandTxException();
}
throw utils::NotYetImplemented("Start/Stop Stream");
try {
if (self_.is_start()) {
db_.db().kafka_streams().StartStream(self_.stream_name());
} else {
db_.db().kafka_streams().StopStream(self_.stream_name());
}
} catch (const KafkaStreamException &e) {
throw QueryRuntimeException(e.what());
}
return false;
}
void Reset() override { throw utils::NotYetImplemented("Start/Stop Stream"); }
private:
const StartStopStream &self_;
database::GraphDbAccessor &db_;
};
std::unique_ptr<Cursor> StartStopStream::MakeCursor(
@ -3982,20 +4087,35 @@ WITHOUT_SINGLE_INPUT(StartStopAllStreams)
class StartStopAllStreamsCursor : public Cursor {
public:
StartStopAllStreamsCursor(const StartStopAllStreams &,
database::GraphDbAccessor &) {}
StartStopAllStreamsCursor(const StartStopAllStreams &self,
database::GraphDbAccessor &db)
: self_(self), db_(db) {}
bool Pull(Frame &frame, Context &ctx) override {
if (ctx.in_explicit_transaction_) {
throw StreamClauseInMulticommandTxException();
}
throw utils::NotYetImplemented("Start/Stop All Streams");
try {
if (self_.is_start()) {
db_.db().kafka_streams().StartAllStreams();
} else {
db_.db().kafka_streams().StopAllStreams();
}
} catch (const KafkaStreamException &e) {
throw QueryRuntimeException(e.what());
}
return false;
}
void Reset() override {
throw utils::NotYetImplemented("Start/Stop All Streams");
}
private:
const StartStopAllStreams &self_;
database::GraphDbAccessor &db_;
};
std::unique_ptr<Cursor> StartStopAllStreams::MakeCursor(

View File

@ -2527,6 +2527,14 @@ by having only one result from each worker.")
:capnp-init nil
:capnp-save #'save-ast-pointer
:capnp-load (load-ast-pointer "Expression *"))
(stream-topic "Expression *"
:reader t
:save-fun #'save-pointer
:load-fun #'load-pointer
:capnp-type "Ast.Tree"
:capnp-init nil
:capnp-save #'save-ast-pointer
:capnp-load (load-ast-pointer "Expression *"))
(transform-uri "Expression *"
:reader t
:save-fun #'save-pointer
@ -2535,7 +2543,15 @@ by having only one result from each worker.")
:capnp-init nil
:capnp-save #'save-ast-pointer
:capnp-load (load-ast-pointer "Expression *"))
(batch-interval "Expression *"
(batch-interval-in-ms "Expression *"
:reader t
:save-fun #'save-pointer
:load-fun #'load-pointer
:capnp-type "Ast.Tree"
:capnp-init nil
:capnp-save #'save-ast-pointer
:capnp-load (load-ast-pointer "Expression *"))
(batch-size "Expression *"
:reader t
:save-fun #'save-pointer
:load-fun #'load-pointer
@ -2547,8 +2563,9 @@ by having only one result from each worker.")
"Creates a new stream with given parameters that imports data.")
(:public
#>cpp
CreateStream(std::string stream_name, Expression* stream_uri,
Expression* transform_uri, Expression* batch_interval);
CreateStream(std::string stream_name, Expression *stream_uri,
Expression *stream_topic, Expression *transform_uri,
Expression *batch_interval_in_ms, Expression *batch_size);
DEFVISITABLE(HierarchicalLogicalOperatorVisitor);
std::unique_ptr<Cursor> MakeCursor(
database::GraphDbAccessor &db) const override;
@ -2587,15 +2604,21 @@ by having only one result from each worker.")
(:serialize :boost :capnp))
(lcp:define-class show-streams (logical-operator)
()
((name-symbol "Symbol" :reader t)
(uri-symbol "Symbol" :reader t)
(topic-symbol "Symbol" :reader t)
(transform-symbol "Symbol" :reader t)
(status-symbol "Symbol" :reader t))
(:documentation
"Shows all streams, started and stopped, that were configured.")
(:public
#>cpp
ShowStreams() {}
ShowStreams(Symbol name_symbol, Symbol endpoint_symbol, Symbol topic_symbol,
Symbol transform_symbol, Symbol status_symbol);
DEFVISITABLE(HierarchicalLogicalOperatorVisitor);
std::unique_ptr<Cursor> MakeCursor(
database::GraphDbAccessor &db) const override;
std::vector<Symbol> OutputSymbols(const SymbolTable &) const override;
std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override {
return {};
}
@ -2604,6 +2627,8 @@ by having only one result from each worker.")
std::shared_ptr<LogicalOperator> input() const override;
void set_input(std::shared_ptr<LogicalOperator>) override;
cpp<#)
(:private
#>cpp ShowStreams() {} cpp<#)
(:serialize :boost :capnp))
(lcp:define-class start-stop-stream (logical-operator)

View File

@ -196,7 +196,8 @@ class RuleBasedPlanner {
DCHECK(!input_op) << "Unexpected operator before CreateStream";
input_op = std::make_unique<plan::CreateStream>(
create_stream->stream_name_, create_stream->stream_uri_,
create_stream->transform_uri_, create_stream->batch_interval_);
create_stream->stream_topic_, create_stream->transform_uri_,
create_stream->batch_interval_in_ms_, create_stream->batch_size_);
} else if (auto *drop_stream =
dynamic_cast<query::DropStream *>(clause)) {
DCHECK(!input_op) << "Unexpected operator before DropStream";
@ -204,7 +205,14 @@ class RuleBasedPlanner {
std::make_unique<plan::DropStream>(drop_stream->stream_name_);
} else if (dynamic_cast<query::ShowStreams *>(clause)) {
DCHECK(!input_op) << "Unexpected operator before ShowStreams";
input_op = std::make_unique<plan::ShowStreams>();
// Create symbols for ShowStreams results
auto &symbol_table = context.symbol_table;
input_op = std::make_unique<plan::ShowStreams>(
symbol_table.CreateSymbol("name", false),
symbol_table.CreateSymbol("uri", false),
symbol_table.CreateSymbol("topic", false),
symbol_table.CreateSymbol("transform", false),
symbol_table.CreateSymbol("is running", false));
} else if (auto *start_stop_stream =
dynamic_cast<query::StartStopStream *>(clause)) {
DCHECK(!input_op) << "Unexpected operator before StartStopStream";

View File

@ -134,7 +134,7 @@ add_unit_test(interpreter.cpp)
target_link_libraries(${test_prefix}interpreter memgraph_lib kvstore_dummy_lib)
add_unit_test(kvstore.cpp)
target_link_libraries(${test_prefix}kvstore gtest gtest_main memgraph_lib kvstore_lib)
target_link_libraries(${test_prefix}kvstore kvstore_lib memgraph_lib glog)
add_unit_test(metrics.cpp)
target_link_libraries(${test_prefix}metrics memgraph_lib kvstore_dummy_lib)
@ -161,7 +161,7 @@ add_unit_test(pod_buffer.cpp)
target_link_libraries(${test_prefix}pod_buffer memgraph_lib kvstore_dummy_lib)
add_unit_test(property_value_store.cpp)
target_link_libraries(${test_prefix}property_value_store memgraph_lib kvstore_lib)
target_link_libraries(${test_prefix}property_value_store kvstore_lib memgraph_lib)
add_unit_test(query_cost_estimator.cpp)
target_link_libraries(${test_prefix}query_cost_estimator memgraph_lib kvstore_dummy_lib)

View File

@ -1963,8 +1963,10 @@ TYPED_TEST(CypherMainVisitorTest, DropUser) {
TYPED_TEST(CypherMainVisitorTest, CreateStream) {
auto check_create_stream = [](
std::string input, const std::string &stream_name,
const std::string &stream_uri, const std::string &transform_uri,
std::experimental::optional<int64_t> batch_interval) {
const std::string &stream_uri, const std::string &stream_topic,
const std::string &transform_uri,
std::experimental::optional<int64_t> batch_interval_in_ms,
std::experimental::optional<int64_t> batch_size) {
TypeParam ast_generator(input);
auto *query = ast_generator.query_;
ASSERT_TRUE(query->single_query_);
@ -1977,47 +1979,95 @@ TYPED_TEST(CypherMainVisitorTest, CreateStream) {
ASSERT_TRUE(create_stream->stream_uri_);
CheckLiteral(ast_generator.context_, create_stream->stream_uri_,
TypedValue(stream_uri));
ASSERT_TRUE(create_stream->stream_topic_);
CheckLiteral(ast_generator.context_, create_stream->stream_topic_,
TypedValue(stream_topic));
ASSERT_TRUE(create_stream->transform_uri_);
CheckLiteral(ast_generator.context_, create_stream->transform_uri_,
TypedValue(transform_uri));
if (batch_interval) {
ASSERT_TRUE(create_stream->batch_interval_);
CheckLiteral(ast_generator.context_, create_stream->batch_interval_,
TypedValue(*batch_interval));
if (batch_interval_in_ms) {
ASSERT_TRUE(create_stream->batch_interval_in_ms_);
CheckLiteral(ast_generator.context_, create_stream->batch_interval_in_ms_,
TypedValue(*batch_interval_in_ms));
} else {
EXPECT_EQ(create_stream->batch_interval_, nullptr);
EXPECT_EQ(create_stream->batch_interval_in_ms_, nullptr);
}
if (batch_size) {
ASSERT_TRUE(create_stream->batch_size_);
CheckLiteral(ast_generator.context_, create_stream->batch_size_,
TypedValue(*batch_size));
} else {
EXPECT_EQ(create_stream->batch_size_, nullptr);
}
};
check_create_stream(
"CREATE STREAM strim AS LOAD DATA KAFKA 'localhost' "
"WITH TOPIC 'tropika' "
"WITH TRANSFORM 'localhost/test.py'",
"strim", "localhost", "localhost/test.py", std::experimental::nullopt);
"strim", "localhost", "tropika", "localhost/test.py",
std::experimental::nullopt, std::experimental::nullopt);
check_create_stream(
"CreaTE StreaM strim AS LOad daTA KAFKA 'localhost' "
"WITH TRAnsFORM 'localhost/test.py' bAtCH inTErvAL 168",
"strim", "localhost", "localhost/test.py", 168);
"WitH TopIC 'tropika' "
"WITH TRAnsFORM 'localhost/test.py' bAtCH_inTErvAL 168",
"strim", "localhost", "tropika", "localhost/test.py", 168,
std::experimental::nullopt);
check_create_stream(
"CreaTE StreaM strim AS LOad daTA KAFKA 'localhost' "
"WITH TopIC 'tropika' "
"WITH TRAnsFORM 'localhost/test.py' bAtCH_SizE 17",
"strim", "localhost", "tropika", "localhost/test.py",
std::experimental::nullopt, 17);
check_create_stream(
"CreaTE StreaM strim AS LOad daTA KAFKA 'localhost' "
"WitH TOPic 'tropika' "
"WITH TRAnsFORM 'localhost/test.py' bAtCH_inTErvAL 168 Batch_SIze 17",
"strim", "localhost", "tropika", "localhost/test.py", 168, 17);
EXPECT_THROW(check_create_stream(
"CREATE STREAM strim AS LOAD DATA KAFKA 'localhost' "
"WITH TRANSFORM 'localhost/test.py' BATCH INTERVAL 'jedan' ",
"strim", "localhost", "localhost/test.py", 168),
"WITH TRANSFORM 'localhost/test.py' BATCH_INTERVAL 'jedan' ",
"strim", "localhost", "tropika", "localhost/test.py", 168,
std::experimental::nullopt),
SyntaxException);
EXPECT_THROW(check_create_stream(
"CREATE STREAM strim AS LOAD DATA KAFKA 'localhost' "
"WITH TOPIC 'tropika' "
"WITH TRANSFORM 'localhost/test.py' BATCH_SIZE 'jedan' ",
"strim", "localhost", "tropika", "localhost/test.py",
std::experimental::nullopt, 17),
SyntaxException);
EXPECT_THROW(check_create_stream(
"CREATE STREAM 123 AS LOAD DATA KAFKA 'localhost' "
"WITH TRANSFORM 'localhost/test.py' BATCH INTERVAL 168 ",
"strim", "localhost", "localhost/test.py", 168),
"WITH TOPIC 'tropika' "
"WITH TRANSFORM 'localhost/test.py' BATCH_INTERVAL 168 ",
"strim", "localhost", "tropika", "localhost/test.py", 168,
std::experimental::nullopt),
SyntaxException);
EXPECT_THROW(check_create_stream(
"CREATE STREAM strim AS LOAD DATA KAFKA localhost "
"WITH TOPIC 'tropika' "
"WITH TRANSFORM 'localhost/test.py'",
"strim", "localhost", "tropika", "localhost/test.py",
std::experimental::nullopt, std::experimental::nullopt),
SyntaxException);
EXPECT_THROW(
check_create_stream("CREATE STREAM strim AS LOAD DATA KAFKA localhost "
"WITH TRANSFORM 'localhost/test.py'",
"strim", "localhost", "localhost/test.py",
std::experimental::nullopt),
SyntaxException);
EXPECT_THROW(check_create_stream(
"CREATE STREAM strim AS LOAD DATA KAFKA 'localhost' "
"WITH TRANSFORM localhost/test.py BATCH INTERVAL 168 ",
"strim", "localhost", "localhost/test.py", 168),
"WITH TOPIC 2"
"WITH TRANSFORM localhost/test.py BATCH_INTERVAL 168 ",
"strim", "localhost", "tropika", "localhost/test.py", 168,
std::experimental::nullopt),
SyntaxException);
EXPECT_THROW(check_create_stream(
"CREATE STREAM strim AS LOAD DATA KAFKA 'localhost' "
"WITH TOPIC 'tropika'"
"WITH TRANSFORM localhost/test.py BATCH_INTERVAL 168 ",
"strim", "localhost", "tropika", "localhost/test.py", 168,
std::experimental::nullopt),
SyntaxException);
}

View File

@ -578,9 +578,11 @@ auto GetMerge(AstStorage &storage, Pattern *pattern, OnMatch on_match,
#define ALTER_USER(username, password) \
storage.Create<query::ModifyUser>((username), LITERAL(password), false)
#define DROP_USER(usernames) storage.Create<query::DropUser>((usernames))
#define CREATE_STREAM(stream_name, stream_uri, transform_uri, batch_interval) \
storage.Create<query::CreateStream>((stream_name), LITERAL(stream_uri), \
LITERAL(transform_uri), batch_interval)
#define CREATE_STREAM(stream_name, stream_uri, stream_topic, transform_uri, \
batch_interval, batch_size) \
storage.Create<query::CreateStream>( \
(stream_name), LITERAL(stream_uri), LITERAL(stream_topic), \
LITERAL(transform_uri), (batch_interval), (batch_size))
#define DROP_STREAM(stream_name) \
storage.Create<query::DropStream>((stream_name))
#define SHOW_STREAMS storage.Create<query::ShowStreams>()

View File

@ -486,39 +486,55 @@ class ExpectPullRemoteOrderBy : public OpChecker<PullRemoteOrderBy> {
class ExpectCreateStream : public OpChecker<CreateStream> {
public:
ExpectCreateStream(std::string stream_name, query::Expression *stream_uri,
query::Expression *stream_topic,
query::Expression *transform_uri,
query::Expression *batch_interval)
query::Expression *batch_interval_in_ms,
query::Expression *batch_size)
: stream_name_(stream_name),
stream_uri_(stream_uri),
stream_topic_(stream_topic),
transform_uri_(transform_uri),
batch_interval_(batch_interval) {}
batch_interval_in_ms_(batch_interval_in_ms),
batch_size_(batch_size) {}
void ExpectOp(CreateStream &create_stream, const SymbolTable &) override {
EXPECT_EQ(create_stream.stream_name(), stream_name_);
// TODO: Proper expression equality
EXPECT_EQ(typeid(create_stream.stream_uri()).hash_code(),
typeid(stream_uri_).hash_code());
EXPECT_EQ(typeid(create_stream.stream_topic()).hash_code(),
typeid(stream_topic_).hash_code());
EXPECT_EQ(typeid(create_stream.transform_uri()).hash_code(),
typeid(transform_uri_).hash_code());
if (batch_interval_ && create_stream.batch_interval()) {
EXPECT_EQ(typeid(create_stream.batch_interval()).hash_code(),
typeid(batch_interval_).hash_code());
if (batch_interval_in_ms_ && create_stream.batch_interval_in_ms()) {
EXPECT_EQ(typeid(create_stream.batch_interval_in_ms()).hash_code(),
typeid(batch_interval_in_ms_).hash_code());
} else {
EXPECT_TRUE(batch_interval_ == nullptr &&
create_stream.batch_interval() == nullptr);
EXPECT_TRUE(batch_interval_in_ms_ == nullptr &&
create_stream.batch_interval_in_ms() == nullptr);
}
if (batch_size_ && create_stream.batch_size()) {
EXPECT_EQ(typeid(create_stream.batch_size()).hash_code(),
typeid(batch_size_).hash_code());
} else {
EXPECT_TRUE(batch_size_ == nullptr &&
create_stream.batch_size() == nullptr);
}
}
private:
std::string stream_name_;
query::Expression *stream_uri_;
query::Expression *stream_topic_;
query::Expression *transform_uri_;
query::Expression *batch_interval_;
query::Expression *batch_interval_in_ms_;
query::Expression *batch_size_;
};
class ExpectDropStream : public OpChecker<DropStream> {
public:
ExpectDropStream(std::string stream_name) : stream_name_(stream_name) {}
explicit ExpectDropStream(std::string stream_name)
: stream_name_(stream_name) {}
void ExpectOp(DropStream &drop_stream, const SymbolTable &) override {
EXPECT_EQ(drop_stream.stream_name(), stream_name_);
@ -558,7 +574,7 @@ class ExpectStartStopStream : public OpChecker<StartStopStream> {
class ExpectStartStopAllStreams : public OpChecker<StartStopAllStreams> {
public:
ExpectStartStopAllStreams(bool is_start) : is_start_(is_start) {}
explicit ExpectStartStopAllStreams(bool is_start) : is_start_(is_start) {}
void ExpectOp(StartStopAllStreams &start_stop_all_streams,
const SymbolTable &) override {
@ -2347,34 +2363,73 @@ TYPED_TEST(TestPlanner, DropUser) {
TYPED_TEST(TestPlanner, CreateStream) {
std::string stream_name("kafka"), stream_uri("localhost:1234"),
transform_uri("localhost:1234/file.py");
int64_t batch_interval = 100;
stream_topic("tropik"), transform_uri("localhost:1234/file.py");
int64_t batch_interval_in_ms = 100;
int64_t batch_size = 10;
{
FakeDbAccessor dba;
AstStorage storage;
QUERY(SINGLE_QUERY(
CREATE_STREAM(stream_name, stream_uri, transform_uri, nullptr)));
auto expected = ExpectCreateStream(stream_name, LITERAL(stream_uri),
LITERAL(transform_uri), nullptr);
QUERY(SINGLE_QUERY(CREATE_STREAM(stream_name, stream_uri, stream_topic,
transform_uri, nullptr, nullptr)));
auto expected = ExpectCreateStream(
stream_name, LITERAL(stream_uri), LITERAL(stream_topic),
LITERAL(transform_uri), nullptr, nullptr);
CheckPlan<TypeParam>(storage, expected);
auto expected_distributed = ExpectDistributed(
MakeCheckers(ExpectCreateStream(stream_name, LITERAL(stream_uri),
LITERAL(transform_uri), nullptr)));
auto expected_distributed =
ExpectDistributed(MakeCheckers(ExpectCreateStream(
stream_name, LITERAL(stream_uri), LITERAL(stream_topic),
LITERAL(transform_uri), nullptr, nullptr)));
CheckDistributedPlan<TypeParam>(storage, expected_distributed);
}
{
FakeDbAccessor dba;
AstStorage storage;
QUERY(SINGLE_QUERY(CREATE_STREAM(stream_name, stream_uri, transform_uri,
LITERAL(batch_interval))));
QUERY(SINGLE_QUERY(CREATE_STREAM(stream_name, stream_uri, stream_topic,
transform_uri,
LITERAL(batch_interval_in_ms), nullptr)));
auto expected = ExpectCreateStream(
stream_name, LITERAL(stream_uri), LITERAL(stream_topic),
LITERAL(transform_uri), LITERAL(batch_interval_in_ms), nullptr);
CheckPlan<TypeParam>(storage, expected);
auto expected_distributed =
ExpectDistributed(MakeCheckers(ExpectCreateStream(
stream_name, LITERAL(stream_uri), LITERAL(stream_topic),
LITERAL(transform_uri), LITERAL(batch_interval_in_ms), nullptr)));
CheckDistributedPlan<TypeParam>(storage, expected_distributed);
}
{
FakeDbAccessor dba;
AstStorage storage;
QUERY(SINGLE_QUERY(CREATE_STREAM(stream_name, stream_uri, stream_topic,
transform_uri, nullptr,
LITERAL(batch_size))));
auto expected = ExpectCreateStream(
stream_name, LITERAL(stream_uri), LITERAL(stream_topic),
LITERAL(transform_uri), nullptr, LITERAL(batch_size));
CheckPlan<TypeParam>(storage, expected);
auto expected_distributed =
ExpectDistributed(MakeCheckers(ExpectCreateStream(
stream_name, LITERAL(stream_uri), LITERAL(stream_topic),
LITERAL(transform_uri), nullptr, LITERAL(batch_size))));
CheckDistributedPlan<TypeParam>(storage, expected_distributed);
}
{
FakeDbAccessor dba;
AstStorage storage;
QUERY(SINGLE_QUERY(
CREATE_STREAM(stream_name, stream_uri, stream_topic, transform_uri,
LITERAL(batch_interval_in_ms), LITERAL(batch_size))));
auto expected =
ExpectCreateStream(stream_name, LITERAL(stream_uri),
LITERAL(transform_uri), LITERAL(batch_interval));
LITERAL(stream_topic), LITERAL(transform_uri),
LITERAL(batch_interval_in_ms), LITERAL(batch_size));
CheckPlan<TypeParam>(storage, expected);
auto expected_distributed = ExpectDistributed(MakeCheckers(
ExpectCreateStream(stream_name, LITERAL(stream_uri),
LITERAL(transform_uri), LITERAL(batch_interval))));
auto expected_distributed =
ExpectDistributed(MakeCheckers(ExpectCreateStream(
stream_name, LITERAL(stream_uri), LITERAL(stream_topic),
LITERAL(transform_uri), LITERAL(batch_interval_in_ms),
LITERAL(batch_size))));
CheckDistributedPlan<TypeParam>(storage, expected_distributed);
}
}