diff --git a/docs/feature_specs/kafka/extractor.md b/docs/feature_specs/kafka/extractor.md new file mode 100644 index 000000000..2d1009e6c --- /dev/null +++ b/docs/feature_specs/kafka/extractor.md @@ -0,0 +1,20 @@ +# Kafka - data extractor + +The data extractor is responsible for loading data from Kafka. In order to do +so, it needs to know the URI of the Kafka leader broker. Once the extractor +connects to Kafka, it starts importing data. + +Data extractor depends on [cppkafka](https://github.com/mfontanini/cppkafka) +which makes message consumption just a few API calls, as seen +[here](https://github.com/mfontanini/cppkafka/wiki/Consuming-messages). + +There are also other metadata that can be passed to data extractor that are +defined with our [extension](opencypher.md) of openCypher. + +A full list of configurable metadata can be found +[here](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md). + +Memgraph supports customizing the following: + + * `metadata.broker.list` which is a required parameter, set by `KAFKA 'URI'` + * `queue.buffering.max.ms` set by `BATCH INTERVAL` diff --git a/docs/feature_specs/kafka/opencypher.md b/docs/feature_specs/kafka/opencypher.md new file mode 100644 index 000000000..22f9dd2a1 --- /dev/null +++ b/docs/feature_specs/kafka/opencypher.md @@ -0,0 +1,52 @@ +# Kafka - openCypher clause + +One must be able to specify the following when importing data from Kafka: +* Kafka URI +* Transform [script](transform.md) URI + + +Kafka endpoint is the URI of the leader broker and it is required for data +[extractor](extractor.md). + +Minimum required syntax looks like: +```opencypher +CREATE STREAM kafka_stream AS LOAD DATA KAFKA '127.0.0.1/topic' WITH TRANSFORM +'127.0.0.1/transform.py'; +``` + +The `CREATE STREAM` clause happens in a transaction. + +The full openCypher clause for creating a stream is: +```opencypher +CREATE STREAM stream_name AS + LOAD DATA KAFKA 'URI' + WITH TRANSFORM 'URI' + [BATCH INTERVAL milliseconds] +``` + +The `WITH TRANSFORM` parameter should contain a URI of the transform script. + +The `BATCH_INTERVAL` parameter defines the time interval in milliseconds +that defines the time between two successive stream importing operations. + +The `DROP` clause deletes a stream: +```opencypher +DROP STREAM stream_name; +``` + +The `SHOW` clause enables you to see all configured streams: +```opencypher +SHOW STREAMS; +``` + +You can also start/stop streams with the `START` and `STOP` clauses: +```opencypher +START STREAM stream_name [LIMIT count BATCHES]; +STOP STREAM stream_name; +``` + +There are also convenience clauses to start and stop all streams: +```opencypher +START ALL STREAMS; +STOP ALL STREAMS; +``` diff --git a/docs/feature_specs/kafka/transform.md b/docs/feature_specs/kafka/transform.md new file mode 100644 index 000000000..92b870c89 --- /dev/null +++ b/docs/feature_specs/kafka/transform.md @@ -0,0 +1,52 @@ +# Kafka - data transform + +The transform script is a user defined script written in Python. The script +should be aware of the data format in the Kafka message. + +Each Kafka message is byte length encoded, which means that the first eight +bytes of each message contain the length of the message. + +More on the message format can be seen +[here](https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets). + + +The script will be embedded in our C++ codebase using pythons +[embedding](https://docs.python.org/3.5/extending/embedding.html) feature. + +A sample code for a streaming transform script could look like this: + +```python +import struct +import sys + +def get_records(): + while True: + message_len = sys.stdin.read(8) + if len(message_len) == 8: + message_len = struct.unpack("L", message_len)[0] + record = sys.stdin.read(message_len) + yield record + else: + assert len(message_len) == 0, message_len + return + +def create_vertex(fields): + return "CREATE (n:Node {{id: {}}})".format(fields[1]) + + +def create_edge(fields): + return "MATCH (n:Node {{id: {}}}) "\ + "MATCH ((m:Node {{id : {}}})) "\ + "CREATE (n)-[e:Edge{{value: {}}}]->(m) "\ + .format(fields[1], fields[2], fields[3]) + +for record in get_records(): + fields = record.split("\t") + if fields[0] == "v": + return create_vertex(fields): + else: + return create_edge(fields) +``` + +The script should output openCypher query strings based on the type of the +records.