Add kafka documentation

Summary:
Updated the feature specs, the changelog and added a new section in
user technical.

Reviewers: mferencevic, mculinovic, buda, ipaljak

Reviewed By: ipaljak

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1534
This commit is contained in:
Matija Santl 2018-07-19 15:19:31 +02:00
parent 705c43a816
commit 4ee3db80b0
11 changed files with 391 additions and 73 deletions

View File

@ -6,6 +6,10 @@
* Write-ahead log format changed (not backward compatible).
### Major Features and Improvements
* Kafka integration
## v0.12.0
### Breaking Changes

View File

@ -1,20 +0,0 @@
# Kafka - data extractor
The data extractor is responsible for loading data from Kafka. In order to do
so, it needs to know the URI of the Kafka leader broker. Once the extractor
connects to Kafka, it starts importing data.
Data extractor depends on [cppkafka](https://github.com/mfontanini/cppkafka)
which makes message consumption just a few API calls, as seen
[here](https://github.com/mfontanini/cppkafka/wiki/Consuming-messages).
There are also other metadata that can be passed to data extractor that are
defined with our [extension](opencypher.md) of openCypher.
A full list of configurable metadata can be found
[here](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md).
Memgraph supports customizing the following:
* `metadata.broker.list` which is a required parameter, set by `KAFKA 'URI'`
* `queue.buffering.max.ms` set by `BATCH INTERVAL`

View File

@ -1,20 +1,19 @@
# Kafka - openCypher clause
One must be able to specify the following when importing data from Kafka:
* Kafka URI
* Kafka topic
* Transform [script](transform.md) URI
Kafka endpoint is the URI of the leader broker and it is required for data
[extractor](extractor.md).
Minimum required syntax looks like:
```opencypher
CREATE STREAM kafka_stream AS LOAD DATA KAFKA '127.0.0.1/topic' WITH TRANSFORM
'127.0.0.1/transform.py';
CREATE STREAM stream_name AS LOAD DATA KAFKA 'URI'
WITH TOPIC 'topic'
WITH TRANSFORM 'URI';
```
The `CREATE STREAM` clause happens in a transaction.
The full openCypher clause for creating a stream is:
```opencypher
@ -25,22 +24,23 @@ CREATE STREAM stream_name AS
[BATCH_INTERVAL milliseconds]
[BATCH_SIZE count]
```
The `CREATE STREAM` clause happens in a transaction.
The `WITH TOPIC` parameter specifies the kafka topic from which we'll stream
`WITH TOPIC` parameter specifies the Kafka topic from which we'll stream
data.
The `WITH TRANSFORM` parameter should contain a URI of the transform script.
`WITH TRANSFORM` parameter should contain a URI of the transform script.
The `BATCH_INTERVAL` parameter defines the time interval in milliseconds
that defines the time between two successive stream importing operations.
`BATCH_INTERVAL` parameter defines the time interval in milliseconds
which is the time between two successive stream importing operations.
The `BATCH_SIZE` parameter defines the count of kafka messages that will be
`BATCH_SIZE` parameter defines the count of Kafka messages that will be
batched together before import.
If both `BATCH_INTERVAL` and `BATCH_SIZE` parameters are given, the condition
that is satisfied first will trigger the batched import.
Default values for `BATCH_INTERVAL` is 100 milliseconds, and the default value
Default value for `BATCH_INTERVAL` is 100 milliseconds, and the default value
for `BATCH_SIZE` is 10;
The `DROP` clause deletes a stream:
@ -68,7 +68,6 @@ START ALL STREAMS;
STOP ALL STREAMS;
```
Before the actual import, you can also test the stream with the `TEST
STREAM` clause:
```opencypher

View File

@ -6,46 +6,28 @@ should be aware of the data format in the Kafka message.
Each Kafka message is byte length encoded, which means that the first eight
bytes of each message contain the length of the message.
More on the message format can be seen
[here](https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets).
The script will be embedded in our C++ codebase using pythons
[embedding](https://docs.python.org/3.5/extending/embedding.html) feature.
A sample code for a streaming transform script could look like this:
```python
import struct
import sys
def create_vertex(vertex_id):
return ("CREATE (:Node {id: $id})", {"id": vertex_id})
def get_records():
while True:
message_len = sys.stdin.read(8)
if len(message_len) == 8:
message_len = struct.unpack("L", message_len)[0]
record = sys.stdin.read(message_len)
yield record
def create_edge(from_id, to_id):
return ("MATCH (n:Node {id: $from_id}), (m:Node {id: $to_id}) "\
"CREATE (n)-[:Edge]->(m)", {"from_id": from_id, "to_id": to_id})
def stream(batch):
result = []
for item in batch:
message = item.decode('utf-8').strip().split()
if len(message) == 1:
result.append(create_vertex(message[0])))
else:
assert len(message_len) == 0, message_len
return
result.append(create_edge(message[0], message[1]))
return result
def create_vertex(fields):
return "CREATE (n:Node {{id: {}}})".format(fields[1])
def create_edge(fields):
return "MATCH (n:Node {{id: {}}}) "\
"MATCH ((m:Node {{id : {}}})) "\
"CREATE (n)-[e:Edge{{value: {}}}]->(m) "\
.format(fields[1], fields[2], fields[3])
for record in get_records():
fields = record.split("\t")
if fields[0] == "v":
return create_vertex(fields):
else:
return create_edge(fields)
```
The script should output openCypher query strings based on the type of the

View File

@ -15,6 +15,7 @@ data structures, multi-version concurrency control and asynchronous IO.
* [Examples](examples.md)
* [Drivers](drivers.md)
* [Data Storage](storage.md)
* [Integrations](integrations.md)
* [openCypher Query Language](open-cypher.md)
* [Import Tools](import-tools.md)
* [Concepts](concepts.md)

View File

@ -0,0 +1,220 @@
## Integrations
### Kafka
Apache Kafka is an open-source stream-processing software platform. The project
aims to provide a unified, high-throughput, low-latency platform for handling
real-time data feeds.
Memgraph offers easy data import at the source using Kafka as the
high-throughput messaging system.
#### openCypher
Memgraphs custom openCypher clause for creating a stream is:
```opencypher
CREATE STREAM stream_name AS
LOAD DATA KAFKA 'URI'
WITH TOPIC 'topic'
WITH TRANSFORM 'URI'
[BATCH_INTERVAL milliseconds]
[BATCH_SIZE count]
```
The `CREATE STREAM` clause happens in a transaction.
`WITH TOPIC` parameter specifies the Kafka topic from which we'll stream
data.
`WITH TRANSFORM` parameter should contain a URI of the transform script.
We cover more about the transform script later, in the [transform](#transform)
section.
`BATCH_INTERVAL` parameter defines the time interval in milliseconds
which is the time between two successive stream importing operations.
`BATCH_SIZE` parameter defines the count of Kafka messages that will be
batched together before import.
If both `BATCH_INTERVAL` and `BATCH_SIZE` parameters are given, the condition
that is satisfied first will trigger the batched import.
Default value for `BATCH_INTERVAL` is 100 milliseconds, and the default value
for `BATCH_SIZE` is 10.
The `DROP` clause deletes a stream:
```opencypher
DROP STREAM stream_name;
```
The `SHOW` clause enables you to see all configured streams:
```opencypher
SHOW STREAMS;
```
You can also start/stop streams with the `START` and `STOP` clauses:
```opencypher
START STREAM stream_name [LIMIT count BATCHES];
STOP STREAM stream_name;
```
A stream needs to be stopped in order to start it and it needs to be started in
order to stop it. Starting a started or stopping a stopped stream will not
affect that stream.
There are also convenience clauses to start and stop all streams:
```opencypher
START ALL STREAMS;
STOP ALL STREAMS;
```
Before the actual import, you can also test the stream with the `TEST
STREAM` clause:
```opencypher
TEST STREAM stream_name [LIMIT count BATCHES];
```
When a stream is tested, data extraction and transformation occurs, but nothing
is inserted into the graph.
A stream needs to be stopped in order to test it. When the batch limit is
omitted, `TEST STREAM` will run for only one batch by default.
#### Transform
The transform script allows Memgraph users to have custom Kafka messages and
still be able to import data in Memgraph by adding the logic to decode the
messages in the transform script.
The entry point of the transform script from Memgraph is the `stream` function.
Input for the `stream` function is a list of bytes that represent byte encoded
Kafka messages, and the output of the `stream` function must be a list of
tuples containing openCypher string queries and corresponding parameters stored
in a dictionary.
To be more precise, the signature of the `stream` function looks like the
following:
```plaintext
stream : [bytes] -> [(str, {str : type})]
type : none | bool | int | float | str | list | dict
```
An example of a simple transform script that creates vertices if the message
contains one number (the vertex id) or it creates edges if the message contains
two numbers (origin vertex id and destination vertex id) would look like the
following:
```python
def create_vertex(vertex_id):
return ("CREATE (:Node {id: $id})", {"id": vertex_id})
def create_edge(from_id, to_id):
return ("MATCH (n:Node {id: $from_id}), (m:Node {id: $to_id}) "\
"CREATE (n)-[:Edge]->(m)", {"from_id": from_id, "to_id": to_id})
def stream(batch):
result = []
for item in batch:
message = item.decode('utf-8').split()
if len(message) == 1:
result.append(create_vertex(message[0]))
elif len(message) == 2:
result.append(create_edge(message[0], message[1]))
return result
```
#### Example
For this example, we assume you have a local instance of Kafka. You can find
more about running Kafka [here](https://kafka.apache.org/quickstart).
From this point forth, we assume you have a instance of Kafka running on
`localhost:9092` with a topic `test` and that you've started Memgraph and have
Memgraph client running.
Each Kafka stream in Memgraph requires a transform script written in `Python`
that knows how to interpret incoming data and transform the data to queries that
Memgraph understands. Lets assume you have script available on
`http://localhost/transform.py`.
Lets also assume the Kafka topic contains two types of messages:
* Node creation: the message contains a single number, the node id.
* Edge creation: the message contains two numbers, origin node id and
destination node id.
In order to create a stream input the following query in the client:
```opencypher
CREATE STREAM mystream AS LOAD DATA KAFKA 'localhost:9092' WITH TOPIC 'test' WITH
TRANSFORM 'http://localhost/transform.py'
```
This will create the stream inside Memgraph but will not start it yet. However,
if the Kafka instance isn't available on the given URI, or the topic doesn't
exist, the query will fail with an appropriate message.
E.g. if the transform script can't be found at the given URI, the following
error will be shown:
```plaintext
Client received exception: Couldn't get the transform script from http://localhost/transform.py
```
Similar, if the given Kafka topic doesn't exist, we'll get the following:
```plaintext
Client received exception: Kafka stream mystream, topic not found
```
After a successful stream creation, you can check the status of all streams by
executing:
```opencypher
SHOW STREAMS
```
This should produce the following output:
```plaintext
+----------+----------------+-------+------------------------------+---------+
| name | uri | topic | transform | status |
+---------------------------+--------------------------------------+---------+
| mystream | localhost:9092 | test | http://localhost/memgraph.py | stopped |
+----------+----------------+-------+------------------------------+---------+
```
As you can notice, the status of this stream is stopped.
In order to see if everything is correct, you can test the stream by executing:
```opencypher
TEST STREAM mystream;
```
This will ingest data from Kafka, but instead of writing it to Memgraph, it will
just output the result.
If the `test` Kafka topic would contain two messages, `1` and `1 2` the result
of the `TEST STREAM` query would look like:
```plaintext
+-------------------------------------------------------------------------------+-------------------------+
| query | params |
+-------------------------------------------------------------------------------+-------------------------+
| CREATE (:Node {id: $id}) | {id:"1"} |
| MATCH (n:Node {id: $from_id}), (m:Node {id: $to_id}) CREATE (n)-[:Edge]->(m) | {from_id:"1",to_id:"2"} |
+-------------------------------------------------------------------------------+-------------------------+
```
To start ingesting data from a stream, you need to execute the following query:
```opencypher
START STREAM mystream;
```
If we check the stream status now, the output would look like this:
```plaintext
+----------+----------------+-------+------------------------------+---------+
| name | uri | topic | transform | status |
+---------------------------+--------------------------------------+---------+
| mystream | localhost:9092 | test | http://localhost/memgraph.py | running |
+----------+----------------+-------+------------------------------+---------+
```
To stop ingesting data, the stop stream query needs to be executed:
```opencypher
STOP STREAM mystream;
```
If Memgraph shuts down, all streams that existed before the shutdown are going
to be recovered.

View File

@ -325,7 +325,7 @@ StreamStatus Consumer::Status() {
return ret;
}
StreamInfo Consumer::info() {
StreamInfo Consumer::Info() {
info_.is_running = is_running_;
return info_;
}

View File

@ -1,3 +1,4 @@
/// @file
#pragma once
#include <atomic>
@ -16,6 +17,10 @@
namespace integrations {
namespace kafka {
/// StreamInfo holds all important info about a stream for memgraph.
///
/// The fields inside this struct are used for serialization and
/// deserialization.
struct StreamInfo {
std::string stream_name;
std::string stream_uri;
@ -29,6 +34,7 @@ struct StreamInfo {
bool is_running = false;
};
/// StreamStatus holds all important info about a stream for a user.
struct StreamStatus {
std::string stream_name;
std::string stream_uri;
@ -37,10 +43,24 @@ struct StreamStatus {
std::string stream_status;
};
/// Memgraphs kafka consumer wrapper.
///
/// Class Consumer wraps around librdkafka Consumer so it's easier to use it.
/// It extends RdKafka::EventCb in order to listen to error events.
class Consumer final : public RdKafka::EventCb {
public:
Consumer() = delete;
/// Creates a new consumer with the given parameters.
///
/// @param info necessary info about a stream
/// @param script_path path on the filesystem where the transform script
/// is stored
/// @param stream_writer custom lambda that knows how to write data to the
/// db
//
/// @throws ConsumerFailedToInitializeException if the consumer can't connect
/// to the Kafka endpoint.
Consumer(const StreamInfo &info, const std::string &transform_script_path,
std::function<
void(const std::string &,
@ -53,21 +73,48 @@ class Consumer final : public RdKafka::EventCb {
Consumer &operator=(const Consumer &other) = delete;
Consumer &operator=(Consumer &&other) = delete;
/// Starts importing data from a stream to the db.
/// This method will start a new thread which does the import.
///
/// @param limit_batches if present, the consumer will only import the given
/// number of batches in the db, and stop afterwards.
///
/// @throws ConsumerNotAvailableException if the consumer isn't initialized
/// @throws ConsumerRunningException if the consumer is already running
void Start(std::experimental::optional<int64_t> limit_batches);
/// Stops importing data from a stream to the db.
///
/// @throws ConsumerNotAvailableException if the consumer isn't initialized
/// @throws ConsumerStoppedException if the consumer is already stopped
void Stop();
/// Starts importing importing from a stream only if the stream is stopped.
///
/// @throws ConsumerNotAvailableException if the consumer isn't initialized
void StartIfStopped();
/// Stops importing from a stream only if the stream is running.
///
/// @throws ConsumerNotAvailableException if the consumer isn't initialized
void StopIfRunning();
/// Performs a dry-run on a given stream.
///
/// @param limit_batches the consumer will only test on the given number of
/// batches. If not present, a default value is used.
///
/// @throws ConsumerNotAvailableException if the consumer isn't initialized
/// @throws ConsumerRunningException if the consumer is alredy running.
std::vector<
std::pair<std::string, std::map<std::string, communication::bolt::Value>>>
Test(std::experimental::optional<int64_t> limit_batches);
/// Returns the current status of a stream.
StreamStatus Status();
StreamInfo info();
/// Returns the info of a stream.
StreamInfo Info();
private:
StreamInfo info_;

View File

@ -195,7 +195,7 @@ void Streams::Start(const std::string &stream_name,
// Store stream_info in metadata_store_.
if (!metadata_store_.Put(stream_name,
Serialize(find_it->second.info()).dump())) {
Serialize(find_it->second.Info()).dump())) {
throw StreamMetadataCouldNotBeStored(stream_name);
}
}
@ -210,7 +210,7 @@ void Streams::Stop(const std::string &stream_name) {
// Store stream_info in metadata_store_.
if (!metadata_store_.Put(stream_name,
Serialize(find_it->second.info()).dump())) {
Serialize(find_it->second.Info()).dump())) {
throw StreamMetadataCouldNotBeStored(stream_name);
}
}
@ -222,7 +222,7 @@ void Streams::StartAll() {
// Store stream_info in metadata_store_.
if (!metadata_store_.Put(consumer_kv.first,
Serialize(consumer_kv.second.info()).dump())) {
Serialize(consumer_kv.second.Info()).dump())) {
throw StreamMetadataCouldNotBeStored(consumer_kv.first);
}
}
@ -235,7 +235,7 @@ void Streams::StopAll() {
// Store stream_info in metadata_store_.
if (!metadata_store_.Put(consumer_kv.first,
Serialize(consumer_kv.second.info()).dump())) {
Serialize(consumer_kv.second.Info()).dump())) {
throw StreamMetadataCouldNotBeStored(consumer_kv.first);
}
}

View File

@ -1,3 +1,4 @@
/// @file
#pragma once
#include "integrations/kafka/consumer.hpp"
@ -10,32 +11,97 @@
namespace integrations::kafka {
/// Manages kafka consumers.
///
/// This class is responsible for all query supported actions to happen.
class Streams final {
public:
/// Initialize streams.
///
/// @param streams_directory path on the filesystem where the streams metadata
/// will be persisted and where the transform scripts will be
/// downloaded
/// @param stream_writer lambda that knows how to write data to the db
Streams(const std::string &streams_directory,
std::function<
void(const std::string &,
const std::map<std::string, communication::bolt::Value> &)>
stream_writer);
/// Looks for persisted metadata and tries to recover consumers.
///
/// @throws TransformScriptNotFoundException if the transform script is
// missing
/// @throws StreamDeserializationException if the metadata can't be recovered
void Recover();
/// Creates a new import stream.
/// This method makes sure there is no other stream with the same name,
/// downloads the given transform script and writes metadata to persisted
/// store.
///
/// @param info StreamInfo struct with necessary data for a kafka consumer.
/// @param download_transform_script Denote whether or not the transform
/// script should be downloaded.
///
/// @throws StreamExistsException if the stream with the same name exists
/// @throws StreamMetadataCouldNotBeStored if it can't persist metadata
/// @throws TransformScriptCouldNotBeCreatedException if the script could not
/// be created
void Create(const StreamInfo &info, bool download_transform_script = true);
/// Deletes an existing stream and all the data that was persisted.
///
/// @param stream_name name of the stream that needs to be deleted.
///
/// @throws StreamDoesntExistException if the stream doesn't exist
/// @throws StreamMetadataCouldNotBeDeleted if the persisted metadata can't be
/// delteed
/// @throws TransformScriptNotFoundException if the transform script can't be
/// deleted
void Drop(const std::string &stream_name);
/// Start consuming from a stream.
///
/// @param stream_name name of the stream we want to start consuming
/// @param batch_limit number of batches we want to import before stopping
///
/// @throws StreamDoesntExistException if the stream doesn't exist
/// @throws StreamMetadataCouldNotBeStored if it can't persist metadata
void Start(const std::string &stream_name,
std::experimental::optional<int64_t> batch_limit =
std::experimental::nullopt);
/// Stop consuming from a stream.
///
/// @param stream_name name of the stream we wanto to stop consuming
///
/// @throws StreamDoesntExistException if the stream doesn't exist
/// @throws StreamMetadataCouldNotBeStored if it can't persist metadata
void Stop(const std::string &stream_name);
/// Start consuming from all streams that are stopped.
///
/// @throws StreamMetadataCouldNotBeStored if it can't persist metadata
void StartAll();
/// Stop consuming from all streams that are running.
///
/// @throws StreamMetadataCouldNotBeStored if it can't persist metadata
void StopAll();
/// Return current status for all streams.
std::vector<StreamStatus> Show();
/// Do a dry-run consume from a stream.
///
/// @param stream_name name of the stream we want to test
/// @param batch_limit number of batches we want to test before stopping
///
/// @returns A vector of pairs consisting of the query (std::string) and its
/// parameters (std::map<std::string, communication::bolt::Value).
///
/// @throws StreamDoesntExistException if the stream doesn't exist
std::vector<
std::pair<std::string, std::map<std::string, communication::bolt::Value>>>
Test(const std::string &stream_name,
@ -44,10 +110,12 @@ class Streams final {
private:
std::string streams_directory_;
/// Custom lambda that "knows" how to execute queries.
std::function<void(const std::string &,
const std::map<std::string, communication::bolt::Value> &)>
stream_writer_;
/// Key value storage used as a persistent storage for stream metadata.
storage::KVStore metadata_store_;
std::mutex mutex_;

View File

@ -1,3 +1,4 @@
/// @file
#pragma once
#include <experimental/filesystem>
@ -16,15 +17,31 @@ struct TargetArguments {
int pipe_from_python{-1};
};
/// Wrapper around the transform script for a stream.
class Transform final {
private:
const int kStackSizeBytes = 262144;
public:
/// Download the transform script from the given URI and store it on the given
/// path.
///
/// @param transform_script_uri URI of the script
/// @param transform_script_path path on the filesystem where the script
/// will be stored
///
/// @throws TransformScriptDownloadException if it can't download the script
explicit Transform(const std::string &transform_script_path);
/// Starts the transform script.
///
/// @return bool True on success or False otherwise.
bool Start();
/// Transform the given batch of messages using the transform script.
///
/// @param batch kafka message batch
/// @return std::vector<std::string> transformed batch of kafka messages
void Apply(const std::vector<std::unique_ptr<RdKafka::Message>> &batch,
std::function<void(
const std::string &,