Add test stream clause

Summary:
Added test stream functionality. When a stream is configured, it will try to
consume messages from a kafka topic and return them back to the user.
For now, the messages aren't transformed, so it just returns the payload string.

Depends on D1466

Next steps are persisting stream metadata and transforming messages in order to
store them in the graph.

Reviewers: teon.banek, mtomic

Reviewed By: teon.banek

Subscribers: pullbot, buda

Differential Revision: https://phabricator.memgraph.io/D1474
This commit is contained in:
Matija Santl 2018-07-03 13:57:53 +02:00
parent c4f51d87f8
commit a026c4c764
28 changed files with 489 additions and 64 deletions

View File

@ -58,9 +58,24 @@ You can also start/stop streams with the `START` and `STOP` clauses:
START STREAM stream_name [LIMIT count BATCHES];
STOP STREAM stream_name;
```
A stream needs to be stopped in order to start it and it needs to be started in
order to stop it. Starting a started or stopping a stopped stream will not
affect that stream.
There are also convenience clauses to start and stop all streams:
```opencypher
START ALL STREAMS;
STOP ALL STREAMS;
```
Before the actual import, you can also test the stream with the `TEST
STREAM` clause:
```opencypher
TEST STREAM stream_name [LIMIT count BATCHES];
```
When a stream is tested, data extraction and transformation occurs, but no
output is inserted in the graph.
A stream needs to be stopped in order to test it. When the batch limit is
omitted, `TEST STREAM` will run for only one batch by default.

View File

@ -13,6 +13,7 @@ using namespace std::chrono_literals;
constexpr int64_t kDefaultBatchIntervalMillis = 100;
constexpr int64_t kDefaultBatchSize = 10;
constexpr int64_t kDefaultTestBatchLimit = 1;
void Consumer::event_cb(RdKafka::Event &event) {
switch (event.type()) {
@ -96,60 +97,72 @@ void Consumer::StopConsuming() {
}
void Consumer::StartConsuming(
std::experimental::optional<int64_t> batch_limit) {
thread_ = std::thread([this, batch_limit]() {
std::experimental::optional<int64_t> limit_batches) {
thread_ = std::thread([this, limit_batches]() {
int64_t batch_count = 0;
is_running_.store(true);
while (is_running_) {
int64_t remaining_timeout_in_ms =
info_.batch_interval_in_ms.value_or(kDefaultBatchIntervalMillis);
int64_t remaining_size = info_.batch_size.value_or(kDefaultBatchSize);
auto start = std::chrono::system_clock::now();
bool run_batch = true;
while (is_running_ && run_batch && remaining_size-- > 0) {
std::unique_ptr<RdKafka::Message> msg(
consumer_->consume(remaining_timeout_in_ms));
switch (msg->err()) {
case RdKafka::ERR__TIMED_OUT:
run_batch = false;
break;
case RdKafka::ERR_NO_ERROR:
// TODO (msantl): store message to current batch and pass the batch
// to transform
break;
default:
LOG(ERROR) << "Consumer error: " << msg->errstr();
is_running_.store(false);
break;
}
auto now = std::chrono::system_clock::now();
auto took =
std::chrono::duration_cast<std::chrono::milliseconds>(now - start);
if (took.count() >= remaining_timeout_in_ms) {
break;
}
remaining_timeout_in_ms = remaining_timeout_in_ms - took.count();
start = now;
}
if (batch_limit != std::experimental::nullopt) {
batch_count++;
if (batch_limit <= batch_count) {
auto batch = this->GetBatch();
// TODO (msantl): transform the batch
if (limit_batches != std::experimental::nullopt) {
if (limit_batches <= ++batch_count) {
is_running_.store(false);
break;
}
}
}
});
}
void Consumer::Start(std::experimental::optional<int64_t> batch_limit) {
std::vector<std::unique_ptr<RdKafka::Message>> Consumer::GetBatch() {
std::vector<std::unique_ptr<RdKafka::Message>> batch;
bool run_batch = false;
auto start = std::chrono::system_clock::now();
int64_t remaining_timeout_in_ms =
info_.batch_interval_in_ms.value_or(kDefaultBatchIntervalMillis);
int64_t batch_size = info_.batch_size.value_or(kDefaultBatchSize);
batch.reserve(batch_size);
for (int64_t i = 0; i < batch_size; ++i) {
std::unique_ptr<RdKafka::Message> msg(
consumer_->consume(remaining_timeout_in_ms));
switch (msg->err()) {
case RdKafka::ERR__TIMED_OUT:
run_batch = false;
break;
case RdKafka::ERR_NO_ERROR:
batch.emplace_back(std::move(msg));
break;
default:
LOG(ERROR) << "[Kafka] Consumer error: " << msg->errstr();
run_batch = false;
is_running_.store(false);
break;
}
if (!run_batch) {
break;
}
auto now = std::chrono::system_clock::now();
auto took =
std::chrono::duration_cast<std::chrono::milliseconds>(now - start);
if (took.count() >= remaining_timeout_in_ms) {
break;
}
remaining_timeout_in_ms = remaining_timeout_in_ms - took.count();
start = now;
}
return batch;
}
void Consumer::Start(std::experimental::optional<int64_t> limit_batches) {
if (!consumer_) {
throw ConsumerNotAvailableException(info_.stream_name);
}
@ -158,7 +171,7 @@ void Consumer::Start(std::experimental::optional<int64_t> batch_limit) {
throw ConsumerRunningException(info_.stream_name);
}
StartConsuming(batch_limit);
StartConsuming(limit_batches);
}
void Consumer::Stop() {
@ -193,6 +206,35 @@ void Consumer::StopIfNotRunning() {
}
}
std::vector<std::string> Consumer::Test(
std::experimental::optional<int64_t> limit_batches) {
if (!consumer_) {
throw ConsumerNotAvailableException(info_.stream_name);
}
if (is_running_) {
throw ConsumerRunningException(info_.stream_name);
}
int64_t num_of_batches = limit_batches.value_or(kDefaultTestBatchLimit);
std::vector<std::string> results;
is_running_.store(true);
for (int64_t i = 0; i < num_of_batches; ++i) {
auto batch = GetBatch();
// TODO (msantl): transform the batch
for (auto &result : batch) {
results.push_back(
std::string(reinterpret_cast<char *>(result->payload())));
}
}
is_running_.store(false);
return results;
}
StreamInfo Consumer::info() {
info_.is_running = is_running_;
return info_;

View File

@ -35,7 +35,7 @@ class Consumer final : public RdKafka::EventCb {
Consumer &operator=(const Consumer &other) = delete;
Consumer &operator=(Consumer &&other) = delete;
void Start(std::experimental::optional<int64_t> batch_limit);
void Start(std::experimental::optional<int64_t> limit_batches);
void Stop();
@ -43,6 +43,9 @@ class Consumer final : public RdKafka::EventCb {
void StopIfNotRunning();
std::vector<std::string> Test(
std::experimental::optional<int64_t> limit_batches);
StreamInfo info();
private:
@ -59,7 +62,9 @@ class Consumer final : public RdKafka::EventCb {
void StopConsuming();
void StartConsuming(std::experimental::optional<int64_t> batch_limit);
void StartConsuming(std::experimental::optional<int64_t> limit_batches);
std::vector<std::unique_ptr<RdKafka::Message>> GetBatch();
};
} // namespace kafka

View File

@ -32,28 +32,28 @@ class ConsumerFailedToInitializeException : public KafkaStreamException {
class ConsumerNotAvailableException : public KafkaStreamException {
public:
ConsumerNotAvailableException(const std::string &stream_name)
explicit ConsumerNotAvailableException(const std::string &stream_name)
: KafkaStreamException(
fmt::format("Kafka stream {} not available", stream_name)) {}
};
class ConsumerRunningException : public KafkaStreamException {
public:
ConsumerRunningException(const std::string &stream_name)
explicit ConsumerRunningException(const std::string &stream_name)
: KafkaStreamException(
fmt::format("Kafka stream {} is already running", stream_name)) {}
};
class ConsumerStoppedException : public KafkaStreamException {
public:
ConsumerStoppedException(const std::string &stream_name)
explicit ConsumerStoppedException(const std::string &stream_name)
: KafkaStreamException(
fmt::format("Kafka stream {} is already stopped", stream_name)) {}
};
class TopicNotFoundException : public KafkaStreamException {
public:
TopicNotFoundException(const std::string &stream_name)
explicit TopicNotFoundException(const std::string &stream_name)
: KafkaStreamException(
fmt::format("Kafka stream {}, topic not found", stream_name)) {}
};

View File

@ -64,5 +64,16 @@ std::vector<StreamInfo> Streams::ShowStreams() {
return streams;
}
std::vector<std::string> Streams::TestStream(
const std::string &stream_name,
std::experimental::optional<int64_t> batch_limit) {
std::lock_guard<std::mutex> g(mutex_);
auto find_it = consumers_.find(stream_name);
if (find_it == consumers_.end())
throw StreamDoesntExistException(stream_name);
return find_it->second.Test(batch_limit);
}
} // namespace kafka
} // namespace integrations

View File

@ -27,6 +27,11 @@ class Streams final {
std::vector<StreamInfo> ShowStreams();
std::vector<std::string> TestStream(
const std::string &stream_name,
std::experimental::optional<int64_t> batch_limit =
std::experimental::nullopt);
private:
std::mutex mutex_;
std::unordered_map<std::string, Consumer> consumers_;

View File

@ -139,6 +139,7 @@ struct Clause {
showStreams @17 :ShowStreams;
startStopStream @18 :StartStopStream;
startStopAllStreams @19 :StartStopAllStreams;
testStream @20 :TestStream;
}
}
@ -431,3 +432,9 @@ struct StartStopStream {
struct StartStopAllStreams {
isStart @0 :Bool;
}
struct TestStream {
streamName @0 :Text;
limitBatches @1: Tree;
}

View File

@ -1312,6 +1312,10 @@ Clause *Clause::Construct(const capnp::Clause::Reader &reader,
auto ssas_reader = reader.getStartStopAllStreams();
return StartStopAllStreams::Construct(ssas_reader, storage);
}
case capnp::Clause::TEST_STREAM: {
auto ts_reader = reader.getTestStream();
return TestStream::Construct(ts_reader, storage);
}
}
}
@ -1554,6 +1558,41 @@ StartStopAllStreams *StartStopAllStreams::Construct(
return storage->Create<StartStopAllStreams>();
}
// TestStream.
void TestStream::Save(capnp::Clause::Builder *builder,
std::vector<int> *saved_uids) {
Clause::Save(builder, saved_uids);
auto test_builder = builder->initTestStream();
TestStream::Save(&test_builder, saved_uids);
}
void TestStream::Save(capnp::TestStream::Builder *builder,
std::vector<int> *saved_uids) {
builder->setStreamName(stream_name_);
if (limit_batches_) {
auto limit_batches_builder = builder->getLimitBatches();
limit_batches_->Save(&limit_batches_builder, saved_uids);
}
}
void TestStream::Load(const capnp::Tree::Reader &base_reader,
AstStorage *storage, std::vector<int> *loaded_uids) {
Clause::Load(base_reader, storage, loaded_uids);
auto reader = base_reader.getClause().getTestStream();
stream_name_ = reader.getStreamName();
limit_batches_ = nullptr;
if (reader.hasLimitBatches()) {
const auto limit_batches_reader = reader.getLimitBatches();
limit_batches_ = dynamic_cast<Expression *>(
storage->Load(limit_batches_reader, loaded_uids));
}
}
TestStream *TestStream::Construct(const capnp::TestStream::Reader &reader,
AstStorage *storage) {
return storage->Create<TestStream>();
}
// Delete.
void Delete::Save(capnp::Clause::Builder *builder,
std::vector<int> *saved_uids) {
@ -2677,3 +2716,4 @@ BOOST_CLASS_EXPORT_IMPLEMENT(query::DropStream);
BOOST_CLASS_EXPORT_IMPLEMENT(query::ShowStreams);
BOOST_CLASS_EXPORT_IMPLEMENT(query::StartStopStream);
BOOST_CLASS_EXPORT_IMPLEMENT(query::StartStopAllStreams);
BOOST_CLASS_EXPORT_IMPLEMENT(query::TestStream);

View File

@ -3911,6 +3911,56 @@ class StartStopAllStreams : public Clause {
const unsigned int);
};
class TestStream : public Clause {
friend class AstStorage;
public:
DEFVISITABLE(TreeVisitor<TypedValue>);
DEFVISITABLE(HierarchicalTreeVisitor);
TestStream *Clone(AstStorage &storage) const override {
return storage.Create<TestStream>(
stream_name_,
limit_batches_ ? limit_batches_->Clone(storage) : nullptr);
}
static TestStream *Construct(const capnp::TestStream::Reader &reader,
AstStorage *storage);
using Clause::Save;
std::string stream_name_;
Expression *limit_batches_;
protected:
explicit TestStream(int uid) : Clause(uid) {}
TestStream(int uid, std::string stream_name, Expression *limit_batches)
: Clause(uid),
stream_name_(std::move(stream_name)),
limit_batches_(limit_batches) {}
void Save(capnp::Clause::Builder *builder,
std::vector<int> *saved_uids) override;
virtual void Save(capnp::TestStream::Builder *builder,
std::vector<int> *saved_uids);
void Load(const capnp::Tree::Reader &base_reader, AstStorage *storage,
std::vector<int> *loaded_uids) override;
private:
friend class boost::serialization::access;
template <class TArchive>
void serialize(TArchive &ar, const unsigned int) {
ar &boost::serialization::base_object<Clause>(*this);
ar &stream_name_;
ar &limit_batches_;
}
template <class TArchive>
friend void boost::serialization::load_construct_data(TArchive &,
TestStream *,
const unsigned int);
};
#undef CLONE_BINARY_EXPRESSION
#undef CLONE_UNARY_EXPRESSION
#undef SERIALIZE_USING_BASE
@ -3993,6 +4043,7 @@ LOAD_AND_CONSTRUCT(query::DropStream, 0);
LOAD_AND_CONSTRUCT(query::ShowStreams, 0);
LOAD_AND_CONSTRUCT(query::StartStopStream, 0);
LOAD_AND_CONSTRUCT(query::StartStopAllStreams, 0);
LOAD_AND_CONSTRUCT(query::TestStream, 0);
} // namespace boost::serialization
@ -4061,3 +4112,4 @@ BOOST_CLASS_EXPORT_KEY(query::DropStream);
BOOST_CLASS_EXPORT_KEY(query::ShowStreams);
BOOST_CLASS_EXPORT_KEY(query::StartStopStream);
BOOST_CLASS_EXPORT_KEY(query::StartStopAllStreams);
BOOST_CLASS_EXPORT_KEY(query::TestStream);

View File

@ -68,6 +68,7 @@ class DropStream;
class ShowStreams;
class StartStopStream;
class StartStopAllStreams;
class TestStream;
using TreeCompositeVisitor = ::utils::CompositeVisitor<
Query, SingleQuery, CypherUnion, NamedExpression, OrOperator, XorOperator,
@ -85,7 +86,7 @@ using TreeLeafVisitor =
::utils::LeafVisitor<Identifier, PrimitiveLiteral, ParameterLookup,
CreateIndex, ModifyUser, DropUser, CreateStream,
DropStream, ShowStreams, StartStopStream,
StartStopAllStreams>;
StartStopAllStreams, TestStream>;
class HierarchicalTreeVisitor : public TreeCompositeVisitor,
public TreeLeafVisitor {
@ -109,7 +110,7 @@ using TreeVisitor = ::utils::Visitor<
Create, Match, Return, With, Pattern, NodeAtom, EdgeAtom, Delete, Where,
SetProperty, SetProperties, SetLabels, RemoveProperty, RemoveLabels, Merge,
Unwind, Identifier, PrimitiveLiteral, CreateIndex, ModifyUser, DropUser,
CreateStream, DropStream, ShowStreams, StartStopStream,
StartStopAllStreams>;
CreateStream, DropStream, ShowStreams, StartStopStream, StartStopAllStreams,
TestStream>;
} // namespace query

View File

@ -142,7 +142,8 @@ antlrcpp::Any CypherMainVisitor::visitSingleQuery(
dynamic_cast<DropStream *>(clause) ||
dynamic_cast<ShowStreams *>(clause) ||
dynamic_cast<StartStopStream *>(clause) ||
dynamic_cast<StartStopAllStreams *>(clause)) {
dynamic_cast<StartStopAllStreams *>(clause) ||
dynamic_cast<TestStream *>(clause)) {
// If there is stream clause then there shouldn't be anything else.
if (single_query->clauses_.size() != 1U) {
throw SemanticException(
@ -238,6 +239,10 @@ antlrcpp::Any CypherMainVisitor::visitClause(CypherParser::ClauseContext *ctx) {
return static_cast<Clause *>(
ctx->startStopAllStreams()->accept(this).as<StartStopAllStreams *>());
}
if (ctx->testStream()) {
return static_cast<Clause *>(
ctx->testStream()->accept(this).as<TestStream *>());
}
// TODO: implement other clauses.
throw utils::NotYetImplemented("clause '{}'", ctx->getText());
return 0;
@ -440,6 +445,21 @@ antlrcpp::Any CypherMainVisitor::visitCypherReturn(
return return_clause;
}
/**
* @return TestStream*
*/
antlrcpp::Any CypherMainVisitor::visitTestStream(
CypherParser::TestStreamContext *ctx) {
std::string stream_name(std::string(ctx->streamName()->getText()));
Expression *limit_batches = nullptr;
if (ctx->limitBatchesOption()) {
limit_batches = ctx->limitBatchesOption()->accept(this);
}
return storage_.Create<TestStream>(stream_name, limit_batches);
}
antlrcpp::Any CypherMainVisitor::visitReturnBody(
CypherParser::ReturnBodyContext *ctx) {
ReturnBody body;

View File

@ -224,6 +224,11 @@ class CypherMainVisitor : public antlropencypher::CypherBaseVisitor {
antlrcpp::Any visitLimitBatchesOption(
CypherParser::LimitBatchesOptionContext *ctx) override;
/**
* @return TestStream*
*/
antlrcpp::Any visitTestStream(CypherParser::TestStreamContext *ctx) override;
/**
* @return Return*
*/

View File

@ -54,6 +54,7 @@ clause : cypherMatch
| showStreams
| startStopStream
| startStopAllStreams
| testStream
;
cypherMatch : ( OPTIONAL SP )? MATCH SP? pattern ( SP? where )? ;
@ -318,6 +319,8 @@ limitBatchesOption : LIMIT SP limitBatches=literal SP BATCHES ;
startStopAllStreams : ( START | STOP ) SP ALL SP STREAMS ;
testStream : K_TEST SP STREAM SP streamName ( SP limitBatchesOption )? ;
HexInteger : '0x' ( HexDigit )+ ;
DecimalInteger : ZeroDigit
@ -577,6 +580,8 @@ BATCHES : ( 'B' | 'b' ) ( 'A' | 'a' ) ( 'T' | 't' ) ( 'C' | 'c' ) ( 'H' | 'h' )
TOPIC : ( 'T' | 't' ) ( 'O' | 'o' ) ( 'P' | 'p' ) ( 'I' | 'i' ) ( 'C' | 'c' ) ;
K_TEST : ( 'T' | 't' ) ( 'E' | 'e' ) ( 'S' | 's' ) ( 'T' | 't' ) ;
UnescapedSymbolicName : IdentifierStart ( IdentifierPart )* ;
/**

View File

@ -234,6 +234,8 @@ bool SymbolGenerator::Visit(StartStopStream &) { return true; }
bool SymbolGenerator::Visit(StartStopAllStreams &) { return true; }
bool SymbolGenerator::Visit(TestStream &) { return true; }
// Expressions
SymbolGenerator::ReturnType SymbolGenerator::Visit(Identifier &ident) {

View File

@ -54,6 +54,7 @@ class SymbolGenerator : public HierarchicalTreeVisitor {
bool Visit(ShowStreams &) override;
bool Visit(StartStopStream &) override;
bool Visit(StartStopAllStreams &) override;
bool Visit(TestStream &) override;
// Expressions
ReturnType Visit(Identifier &) override;

View File

@ -89,7 +89,8 @@ const trie::Trie kKeywords = {
"extract", "any", "none", "single", "true", "false",
"reduce", "user", "password", "alter", "drop", "stream",
"streams", "load", "data", "kafka", "transform", "batch",
"interval", "show", "start", "stop", "size", "topic"};
"interval", "show", "start", "stop", "size", "topic",
"test"};
// Unicode codepoints that are allowed at the start of the unescaped name.
const std::bitset<kBitsetSize> kUnescapedNameAllowedStarts(std::string(

View File

@ -56,6 +56,7 @@ class ExpressionEvaluator : public TreeVisitor<TypedValue> {
BLOCK_VISIT(ShowStreams);
BLOCK_VISIT(StartStopStream);
BLOCK_VISIT(StartStopAllStreams);
BLOCK_VISIT(TestStream);
#undef BLOCK_VISIT

View File

@ -192,6 +192,7 @@ class CostEstimator : public HierarchicalLogicalOperatorVisitor {
bool Visit(ShowStreams &) override { return true; }
bool Visit(StartStopStream &) override { return true; }
bool Visit(StartStopAllStreams &) override { return true; }
bool Visit(TestStream &) override { return true; }
// TODO: Cost estimate PullRemote and ProduceRemote?

View File

@ -82,6 +82,7 @@ class IndependentSubtreeFinder : public HierarchicalLogicalOperatorVisitor {
bool Visit(ShowStreams &) override { return true; }
bool Visit(StartStopStream &) override { return true; }
bool Visit(StartStopAllStreams &) override { return true; }
bool Visit(TestStream &) override { return true; }
bool PreVisit(ScanAll &scan) override {
prev_ops_.push_back(&scan);
@ -1222,6 +1223,8 @@ class DistributedPlanner : public HierarchicalLogicalOperatorVisitor {
bool Visit(StartStopAllStreams &) override { return true; }
bool Visit(TestStream &) override { return true; }
// Accumulate is used only if the query performs any writes. In such a case,
// we need to synchronize the work done on master and all workers.
// Synchronization will force applying changes to distributed storage, and

View File

@ -4084,9 +4084,17 @@ class StartStopStreamCursor : public Cursor {
throw StreamClauseInMulticommandTxException();
}
ExpressionEvaluator evaluator(frame, &ctx, GraphView::OLD);
std::experimental::optional<int64_t> limit_batches;
if (self_.limit_batches()) {
limit_batches = self_.limit_batches()->Accept(evaluator).Value<int64_t>();
}
try {
if (self_.is_start()) {
db_.db().kafka_streams().StartStream(self_.stream_name());
db_.db().kafka_streams().StartStream(self_.stream_name(),
limit_batches);
} else {
db_.db().kafka_streams().StopStream(self_.stream_name());
}
@ -4151,6 +4159,67 @@ std::unique_ptr<Cursor> StartStopAllStreams::MakeCursor(
return std::make_unique<StartStopAllStreamsCursor>(*this, db);
}
TestStream::TestStream(std::string stream_name, Expression *limit_batches,
Symbol test_result_symbol)
: stream_name_(stream_name),
limit_batches_(limit_batches),
test_result_symbol_(test_result_symbol) {}
WITHOUT_SINGLE_INPUT(TestStream)
class TestStreamCursor : public Cursor {
public:
TestStreamCursor(const TestStream &self, database::GraphDbAccessor &db)
: self_(self), db_(db) {}
bool Pull(Frame &frame, Context &ctx) override {
if (ctx.in_explicit_transaction_) {
throw StreamClauseInMulticommandTxException();
}
if (!is_initialized_) {
ExpressionEvaluator evaluator(frame, &ctx, GraphView::OLD);
std::experimental::optional<int64_t> limit_batches;
if (self_.limit_batches()) {
limit_batches =
self_.limit_batches()->Accept(evaluator).Value<int64_t>();
}
try {
results_ = db_.db().kafka_streams().TestStream(self_.stream_name(),
limit_batches);
} catch (const KafkaStreamException &e) {
throw QueryRuntimeException(e.what());
}
results_it_ = results_.begin();
is_initialized_ = true;
}
if (results_it_ == results_.end()) return false;
frame[self_.test_result_symbol()] = *results_it_;
results_it_++;
return true;
}
void Reset() override { throw utils::NotYetImplemented("Test Stream"); }
private:
const TestStream &self_;
database::GraphDbAccessor &db_;
bool is_initialized_ = false;
std::vector<std::string> results_;
std::vector<std::string>::iterator results_it_ = results_.begin();
};
std::unique_ptr<Cursor> TestStream::MakeCursor(
database::GraphDbAccessor &db) const {
return std::make_unique<TestStreamCursor>(*this, db);
}
} // namespace query::plan
BOOST_CLASS_EXPORT_IMPLEMENT(query::plan::Once);
@ -4196,3 +4265,4 @@ BOOST_CLASS_EXPORT_IMPLEMENT(query::plan::DropStream);
BOOST_CLASS_EXPORT_IMPLEMENT(query::plan::ShowStreams);
BOOST_CLASS_EXPORT_IMPLEMENT(query::plan::StartStopStream);
BOOST_CLASS_EXPORT_IMPLEMENT(query::plan::StartStopAllStreams);
BOOST_CLASS_EXPORT_IMPLEMENT(query::plan::TestStream);

View File

@ -116,6 +116,7 @@ class DropStream;
class ShowStreams;
class StartStopStream;
class StartStopAllStreams;
class TestStream;
using LogicalOperatorCompositeVisitor = ::utils::CompositeVisitor<
Once, CreateNode, CreateExpand, ScanAll, ScanAllByLabel,
@ -130,7 +131,7 @@ using LogicalOperatorCompositeVisitor = ::utils::CompositeVisitor<
using LogicalOperatorLeafVisitor =
::utils::LeafVisitor<Once, CreateIndex, ModifyUser, DropUser,
CreateStream, DropStream, ShowStreams,
StartStopStream, StartStopAllStreams>;
StartStopStream, StartStopAllStreams, TestStream>;
/**
* @brief Base class for hierarhical visitors of @c LogicalOperator class
@ -2689,6 +2690,40 @@ stream is importing.")
#>cpp StartStopAllStreams() {} cpp<#)
(:serialize :boost :capnp))
(lcp:define-class test-stream (logical-operator)
((stream-name "std::string" :reader t)
(limit-batches "Expression *"
:reader t
:save-fun #'save-pointer
:load-fun #'load-pointer
:capnp-type "Ast.Tree"
:capnp-init nil
:capnp-save #'save-ast-pointer
:capnp-load (load-ast-pointer "Expression *"))
(test-result-symbol "Symbol" :reader t))
(:documentation
"Test a stream. This will start consuming messages but wont insert anything
in the db.")
(:public
#>cpp
TestStream(std::string stream_name, Expression *limit_batches,
Symbol test_result_symbol);
DEFVISITABLE(HierarchicalLogicalOperatorVisitor);
std::unique_ptr<Cursor> MakeCursor(
database::GraphDbAccessor &db) const override;
std::vector<Symbol> ModifiedSymbols(const SymbolTable &) const override {
return {};
}
bool HasSingleInput() const override;
std::shared_ptr<LogicalOperator> input() const override;
void set_input(std::shared_ptr<LogicalOperator>) override;
cpp<#)
(:private
#>cpp TestStream() {} cpp<#)
(:serialize :boost :capnp))
(lcp:pop-namespace) ;; plan
(lcp:pop-namespace) ;; query
@ -2735,4 +2770,5 @@ BOOST_CLASS_EXPORT_KEY(query::plan::DropStream);
BOOST_CLASS_EXPORT_KEY(query::plan::ShowStreams);
BOOST_CLASS_EXPORT_KEY(query::plan::StartStopStream);
BOOST_CLASS_EXPORT_KEY(query::plan::StartStopAllStreams);
BOOST_CLASS_EXPORT_KEY(query::plan::TestStream);
cpp<#

View File

@ -60,6 +60,7 @@ class UsedSymbolsCollector : public HierarchicalTreeVisitor {
bool Visit(query::ShowStreams &) override { return true; }
bool Visit(query::StartStopStream &) override { return true; }
bool Visit(query::StartStopAllStreams &) override { return true; }
bool Visit(query::TestStream &) override { return true; }
std::unordered_set<Symbol> symbols_;
const SymbolTable &symbol_table_;

View File

@ -437,6 +437,11 @@ class ReturnBodyContext : public HierarchicalTreeVisitor {
return true;
}
bool Visit(query::TestStream &) override {
has_aggregation_.emplace_back(false);
return true;
}
// Creates NamedExpression with an Identifier for each user declared symbol.
// This should be used when body.all_identifiers is true, to generate
// expressions for Produce operator.

View File

@ -224,6 +224,13 @@ class RuleBasedPlanner {
DCHECK(!input_op) << "Unexpected operator before StartStopAllStreams";
input_op = std::make_unique<plan::StartStopAllStreams>(
start_stop_all_streams->is_start_);
} else if (auto *test_stream =
dynamic_cast<query::TestStream *>(clause)) {
DCHECK(!input_op) << "Unexpected operator before TestStream";
auto &symbol_table = context.symbol_table;
input_op = std::make_unique<plan::TestStream>(
test_stream->stream_name_, test_stream->limit_batches_,
symbol_table.CreateSymbol("test result", false));
} else {
throw utils::NotYetImplemented("clause conversion to operator(s)");
}

View File

@ -549,6 +549,11 @@ class PlanPrinter : public query::plan::HierarchicalLogicalOperatorVisitor {
return true;
}
bool Visit(query::plan::TestStream &op) override {
WithPrintLn([](auto &out) { out << "* TestStream"; });
return true;
}
bool PreVisit(query::plan::PullRemote &op) override {
WithPrintLn([&op](auto &out) {
out << "* PullRemote [" << op.plan_id() << "] {";

View File

@ -2182,4 +2182,43 @@ TYPED_TEST(CypherMainVisitorTest, StartStopAllStreams) {
SyntaxException);
}
TYPED_TEST(CypherMainVisitorTest, TestStream) {
auto check_test_stream = [](
std::string input, const std::string &stream_name,
std::experimental::optional<int64_t> limit_batches) {
TypeParam ast_generator(input);
auto *query = ast_generator.query_;
ASSERT_TRUE(query->single_query_);
auto *single_query = query->single_query_;
ASSERT_EQ(single_query->clauses_.size(), 1U);
auto *test_stream = dynamic_cast<TestStream *>(single_query->clauses_[0]);
EXPECT_TRUE(test_stream);
EXPECT_EQ(test_stream->stream_name_, stream_name);
if (limit_batches) {
ASSERT_TRUE(test_stream->limit_batches_);
CheckLiteral(ast_generator.context_, test_stream->limit_batches_,
TypedValue(*limit_batches));
} else {
EXPECT_EQ(test_stream->limit_batches_, nullptr);
}
};
check_test_stream("TesT STreaM strim", "strim", std::experimental::nullopt);
check_test_stream("tESt STreAM strim LimIT 10 BATchES", "strim", 10);
check_test_stream("Test StrEAM strim", "strim", std::experimental::nullopt);
EXPECT_THROW(check_test_stream("tEST STReaM 'strim'", "strim",
std::experimental::nullopt),
SyntaxException);
EXPECT_THROW(
check_test_stream("test STReaM strim LImiT 'dva' BATCheS", "strim", 2),
SyntaxException);
EXPECT_THROW(check_test_stream("test STreAM 'strim'", "strim",
std::experimental::nullopt),
SyntaxException);
}
} // namespace

View File

@ -587,8 +587,10 @@ auto GetMerge(AstStorage &storage, Pattern *pattern, OnMatch on_match,
storage.Create<query::DropStream>((stream_name))
#define SHOW_STREAMS storage.Create<query::ShowStreams>()
#define START_STREAM(stream_name, limit_batches) \
storage.Create<query::StartStopStream>((stream_name), true, limit_batches)
storage.Create<query::StartStopStream>((stream_name), true, (limit_batches))
#define STOP_STREAM(stream_name) \
storage.Create<query::StartStopStream>((stream_name), false, nullptr)
#define START_ALL_STREAMS storage.Create<query::StartStopAllStreams>(true)
#define STOP_ALL_STREAMS storage.Create<query::StartStopAllStreams>(false)
#define TEST_STREAM(stream_name, limit_batches) \
storage.Create<query::TestStream>((stream_name), (limit_batches))

View File

@ -139,14 +139,10 @@ class PlanChecker : public HierarchicalLogicalOperatorVisitor {
VISIT(CreateStream);
VISIT(DropStream);
bool Visit(ShowStreams &op) override {
CheckOp(op);
return true;
}
VISIT(ShowStreams);
VISIT(StartStopStream);
VISIT(StartStopAllStreams);
VISIT(TestStream);
#undef PRE_VISIT
#undef VISIT
@ -585,6 +581,28 @@ class ExpectStartStopAllStreams : public OpChecker<StartStopAllStreams> {
bool is_start_;
};
class ExpectTestStream : public OpChecker<TestStream> {
public:
ExpectTestStream(std::string stream_name, query::Expression *limit_batches)
: stream_name_(stream_name), limit_batches_(limit_batches) {}
void ExpectOp(TestStream &test_stream, const SymbolTable &) override {
EXPECT_EQ(test_stream.stream_name(), stream_name_);
// TODO: Proper expression equality
if (limit_batches_ && test_stream.limit_batches()) {
EXPECT_EQ(typeid(test_stream.limit_batches()).hash_code(),
typeid(limit_batches_).hash_code());
} else {
EXPECT_TRUE(limit_batches_ == nullptr &&
test_stream.limit_batches() == nullptr);
}
}
private:
std::string stream_name_;
query::Expression *limit_batches_;
};
auto MakeSymbolTable(query::Query &query) {
SymbolTable symbol_table;
SymbolGenerator symbol_generator(symbol_table);
@ -2515,6 +2533,31 @@ TYPED_TEST(TestPlanner, StartStopAllStreams) {
}
}
TYPED_TEST(TestPlanner, TestStream) {
std::string stream_name("kafka");
{
FakeDbAccessor dba;
AstStorage storage;
QUERY(SINGLE_QUERY(TEST_STREAM(stream_name, nullptr)));
auto expected = ExpectTestStream(stream_name, nullptr);
CheckPlan<TypeParam>(storage, expected);
auto expected_distributed =
ExpectDistributed(MakeCheckers(ExpectTestStream(stream_name, nullptr)));
CheckDistributedPlan<TypeParam>(storage, expected_distributed);
}
{
FakeDbAccessor dba;
AstStorage storage;
auto limit_batches = LITERAL(10);
QUERY(SINGLE_QUERY(TEST_STREAM(stream_name, limit_batches)));
auto expected = ExpectTestStream(stream_name, limit_batches);
CheckPlan<TypeParam>(storage, expected);
auto expected_distributed = ExpectDistributed(
MakeCheckers(ExpectTestStream(stream_name, limit_batches)));
CheckDistributedPlan<TypeParam>(storage, expected_distributed);
}
}
TYPED_TEST(TestPlanner, DistributedAvg) {
// Test MATCH (n) RETURN AVG(n.prop) AS res
AstStorage storage;