Kafka stream import feature spec
Summary: First version of the feature spec for importing streams of data using kafka in memgraph. Reviewers: buda, teon.banek, dgleich, ipaljak Reviewed By: buda Subscribers: lion, mculinovic Differential Revision: https://phabricator.memgraph.io/D1415
This commit is contained in:
parent
c72508b183
commit
64f189cc8a
20
docs/feature_specs/kafka/extractor.md
Normal file
20
docs/feature_specs/kafka/extractor.md
Normal file
@ -0,0 +1,20 @@
|
||||
# Kafka - data extractor
|
||||
|
||||
The data extractor is responsible for loading data from Kafka. In order to do
|
||||
so, it needs to know the URI of the Kafka leader broker. Once the extractor
|
||||
connects to Kafka, it starts importing data.
|
||||
|
||||
Data extractor depends on [cppkafka](https://github.com/mfontanini/cppkafka)
|
||||
which makes message consumption just a few API calls, as seen
|
||||
[here](https://github.com/mfontanini/cppkafka/wiki/Consuming-messages).
|
||||
|
||||
There are also other metadata that can be passed to data extractor that are
|
||||
defined with our [extension](opencypher.md) of openCypher.
|
||||
|
||||
A full list of configurable metadata can be found
|
||||
[here](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md).
|
||||
|
||||
Memgraph supports customizing the following:
|
||||
|
||||
* `metadata.broker.list` which is a required parameter, set by `KAFKA 'URI'`
|
||||
* `queue.buffering.max.ms` set by `BATCH INTERVAL`
|
52
docs/feature_specs/kafka/opencypher.md
Normal file
52
docs/feature_specs/kafka/opencypher.md
Normal file
@ -0,0 +1,52 @@
|
||||
# Kafka - openCypher clause
|
||||
|
||||
One must be able to specify the following when importing data from Kafka:
|
||||
* Kafka URI
|
||||
* Transform [script](transform.md) URI
|
||||
|
||||
|
||||
Kafka endpoint is the URI of the leader broker and it is required for data
|
||||
[extractor](extractor.md).
|
||||
|
||||
Minimum required syntax looks like:
|
||||
```opencypher
|
||||
CREATE STREAM kafka_stream AS LOAD DATA KAFKA '127.0.0.1/topic' WITH TRANSFORM
|
||||
'127.0.0.1/transform.py';
|
||||
```
|
||||
|
||||
The `CREATE STREAM` clause happens in a transaction.
|
||||
|
||||
The full openCypher clause for creating a stream is:
|
||||
```opencypher
|
||||
CREATE STREAM stream_name AS
|
||||
LOAD DATA KAFKA 'URI'
|
||||
WITH TRANSFORM 'URI'
|
||||
[BATCH INTERVAL milliseconds]
|
||||
```
|
||||
|
||||
The `WITH TRANSFORM` parameter should contain a URI of the transform script.
|
||||
|
||||
The `BATCH_INTERVAL` parameter defines the time interval in milliseconds
|
||||
that defines the time between two successive stream importing operations.
|
||||
|
||||
The `DROP` clause deletes a stream:
|
||||
```opencypher
|
||||
DROP STREAM stream_name;
|
||||
```
|
||||
|
||||
The `SHOW` clause enables you to see all configured streams:
|
||||
```opencypher
|
||||
SHOW STREAMS;
|
||||
```
|
||||
|
||||
You can also start/stop streams with the `START` and `STOP` clauses:
|
||||
```opencypher
|
||||
START STREAM stream_name [LIMIT count BATCHES];
|
||||
STOP STREAM stream_name;
|
||||
```
|
||||
|
||||
There are also convenience clauses to start and stop all streams:
|
||||
```opencypher
|
||||
START ALL STREAMS;
|
||||
STOP ALL STREAMS;
|
||||
```
|
52
docs/feature_specs/kafka/transform.md
Normal file
52
docs/feature_specs/kafka/transform.md
Normal file
@ -0,0 +1,52 @@
|
||||
# Kafka - data transform
|
||||
|
||||
The transform script is a user defined script written in Python. The script
|
||||
should be aware of the data format in the Kafka message.
|
||||
|
||||
Each Kafka message is byte length encoded, which means that the first eight
|
||||
bytes of each message contain the length of the message.
|
||||
|
||||
More on the message format can be seen
|
||||
[here](https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets).
|
||||
|
||||
|
||||
The script will be embedded in our C++ codebase using pythons
|
||||
[embedding](https://docs.python.org/3.5/extending/embedding.html) feature.
|
||||
|
||||
A sample code for a streaming transform script could look like this:
|
||||
|
||||
```python
|
||||
import struct
|
||||
import sys
|
||||
|
||||
def get_records():
|
||||
while True:
|
||||
message_len = sys.stdin.read(8)
|
||||
if len(message_len) == 8:
|
||||
message_len = struct.unpack("L", message_len)[0]
|
||||
record = sys.stdin.read(message_len)
|
||||
yield record
|
||||
else:
|
||||
assert len(message_len) == 0, message_len
|
||||
return
|
||||
|
||||
def create_vertex(fields):
|
||||
return "CREATE (n:Node {{id: {}}})".format(fields[1])
|
||||
|
||||
|
||||
def create_edge(fields):
|
||||
return "MATCH (n:Node {{id: {}}}) "\
|
||||
"MATCH ((m:Node {{id : {}}})) "\
|
||||
"CREATE (n)-[e:Edge{{value: {}}}]->(m) "\
|
||||
.format(fields[1], fields[2], fields[3])
|
||||
|
||||
for record in get_records():
|
||||
fields = record.split("\t")
|
||||
if fields[0] == "v":
|
||||
return create_vertex(fields):
|
||||
else:
|
||||
return create_edge(fields)
|
||||
```
|
||||
|
||||
The script should output openCypher query strings based on the type of the
|
||||
records.
|
Loading…
Reference in New Issue
Block a user